Lines Matching refs:con

119 static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
123 clear_bit(con_flag, &con->flags);
126 static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
130 set_bit(con_flag, &con->flags);
133 static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
137 return test_bit(con_flag, &con->flags);
140 static bool con_flag_test_and_clear(struct ceph_connection *con,
145 return test_and_clear_bit(con_flag, &con->flags);
148 static bool con_flag_test_and_set(struct ceph_connection *con,
153 return test_and_set_bit(con_flag, &con->flags);
170 static void queue_con(struct ceph_connection *con);
171 static void cancel_con(struct ceph_connection *con);
173 static void con_fault(struct ceph_connection *con);
302 static void con_sock_state_init(struct ceph_connection *con)
306 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
309 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
313 static void con_sock_state_connecting(struct ceph_connection *con)
317 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
320 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
324 static void con_sock_state_connected(struct ceph_connection *con)
328 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
331 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
335 static void con_sock_state_closing(struct ceph_connection *con)
339 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
344 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
348 static void con_sock_state_closed(struct ceph_connection *con)
352 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
358 dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
369 struct ceph_connection *con = sk->sk_user_data;
370 if (atomic_read(&con->msgr->stopping)) {
376 con, con->state);
377 queue_con(con);
384 struct ceph_connection *con = sk->sk_user_data;
393 if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
395 dout("%s %p queueing write work\n", __func__, con);
397 queue_con(con);
400 dout("%s %p nothing to write\n", __func__, con);
407 struct ceph_connection *con = sk->sk_user_data;
410 con, con->state, sk->sk_state);
418 con_sock_state_closing(con);
419 con_flag_set(con, CON_FLAG_SOCK_CLOSED);
420 queue_con(con);
424 con_sock_state_connected(con);
425 queue_con(con);
436 struct ceph_connection *con)
439 sk->sk_user_data = con;
453 static int ceph_tcp_connect(struct ceph_connection *con)
455 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
460 BUG_ON(con->sock);
464 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family,
475 set_sock_callbacks(sock, con);
477 dout("connect %s\n", ceph_pr_addr(&con->peer_addr));
479 con_sock_state_connecting(con);
484 ceph_pr_addr(&con->peer_addr),
488 ceph_pr_addr(&con->peer_addr), ret);
493 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY))
496 con->sock = sock;
593 static int con_close_socket(struct ceph_connection *con)
597 dout("con_close_socket on %p sock %p\n", con, con->sock);
598 if (con->sock) {
599 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
600 sock_release(con->sock);
601 con->sock = NULL;
610 con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
612 con_sock_state_closed(con);
635 static void reset_connection(struct ceph_connection *con)
639 dout("reset_connection %p\n", con);
640 ceph_msg_remove_list(&con->out_queue);
641 ceph_msg_remove_list(&con->out_sent);
643 if (con->in_msg) {
644 BUG_ON(con->in_msg->con != con);
645 ceph_msg_put(con->in_msg);
646 con->in_msg = NULL;
649 con->connect_seq = 0;
650 con->out_seq = 0;
651 if (con->out_msg) {
652 BUG_ON(con->out_msg->con != con);
653 ceph_msg_put(con->out_msg);
654 con->out_msg = NULL;
656 con->in_seq = 0;
657 con->in_seq_acked = 0;
659 con->out_skip = 0;
665 void ceph_con_close(struct ceph_connection *con)
667 mutex_lock(&con->mutex);
668 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
669 con->state = CON_STATE_CLOSED;
671 con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */
672 con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
673 con_flag_clear(con, CON_FLAG_WRITE_PENDING);
674 con_flag_clear(con, CON_FLAG_BACKOFF);
676 reset_connection(con);
677 con->peer_global_seq = 0;
678 cancel_con(con);
679 con_close_socket(con);
680 mutex_unlock(&con->mutex);
687 void ceph_con_open(struct ceph_connection *con,
691 mutex_lock(&con->mutex);
692 dout("con_open %p %s\n", con, ceph_pr_addr(addr));
694 WARN_ON(con->state != CON_STATE_CLOSED);
695 con->state = CON_STATE_PREOPEN;
697 con->peer_name.type = (__u8) entity_type;
698 con->peer_name.num = cpu_to_le64(entity_num);
700 memcpy(&con->peer_addr, addr, sizeof(*addr));
701 con->delay = 0; /* reset backoff memory */
702 mutex_unlock(&con->mutex);
703 queue_con(con);
710 bool ceph_con_opened(struct ceph_connection *con)
712 return con->connect_seq > 0;
718 void ceph_con_init(struct ceph_connection *con, void *private,
722 dout("con_init %p\n", con);
723 memset(con, 0, sizeof(*con));
724 con->private = private;
725 con->ops = ops;
726 con->msgr = msgr;
728 con_sock_state_init(con);
730 mutex_init(&con->mutex);
731 INIT_LIST_HEAD(&con->out_queue);
732 INIT_LIST_HEAD(&con->out_sent);
733 INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
735 con->state = CON_STATE_CLOSED;
756 static void con_out_kvec_reset(struct ceph_connection *con)
758 BUG_ON(con->out_skip);
760 con->out_kvec_left = 0;
761 con->out_kvec_bytes = 0;
762 con->out_kvec_cur = &con->out_kvec[0];
765 static void con_out_kvec_add(struct ceph_connection *con,
768 int index = con->out_kvec_left;
770 BUG_ON(con->out_skip);
771 BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
773 con->out_kvec[index].iov_len = size;
774 con->out_kvec[index].iov_base = data;
775 con->out_kvec_left++;
776 con->out_kvec_bytes += size;
784 static int con_out_kvec_skip(struct ceph_connection *con)
786 int off = con->out_kvec_cur - con->out_kvec;
789 if (con->out_kvec_bytes > 0) {
790 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
791 BUG_ON(con->out_kvec_bytes < skip);
792 BUG_ON(!con->out_kvec_left);
793 con->out_kvec_bytes -= skip;
794 con->out_kvec_left--;
1213 static size_t sizeof_footer(struct ceph_connection *con)
1215 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
1231 static void prepare_write_message_footer(struct ceph_connection *con)
1233 struct ceph_msg *m = con->out_msg;
1237 dout("prepare_write_message_footer %p\n", con);
1238 con_out_kvec_add(con, sizeof_footer(con), &m->footer);
1239 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1240 if (con->ops->sign_message)
1241 con->ops->sign_message(m);
1247 con->out_more = m->more_to_follow;
1248 con->out_msg_done = true;
1254 static void prepare_write_message(struct ceph_connection *con)
1259 con_out_kvec_reset(con);
1260 con->out_msg_done = false;
1264 if (con->in_seq > con->in_seq_acked) {
1265 con->in_seq_acked = con->in_seq;
1266 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
1267 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1268 con_out_kvec_add(con, sizeof (con->out_temp_ack),
1269 &con->out_temp_ack);
1272 BUG_ON(list_empty(&con->out_queue));
1273 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
1274 con->out_msg = m;
1275 BUG_ON(m->con != con);
1279 list_move_tail(&m->list_head, &con->out_sent);
1286 m->hdr.seq = cpu_to_le64(++con->out_seq);
1289 if (con->ops->reencode_message)
1290 con->ops->reencode_message(m);
1294 m, con->out_seq, le16_to_cpu(m->hdr.type),
1301 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
1302 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
1303 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
1306 con_out_kvec_add(con, m->middle->vec.iov_len,
1311 con->out_msg->hdr.crc = cpu_to_le32(crc);
1312 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
1316 con->out_msg->footer.front_crc = cpu_to_le32(crc);
1320 con->out_msg->footer.middle_crc = cpu_to_le32(crc);
1322 con->out_msg->footer.middle_crc = 0;
1324 le32_to_cpu(con->out_msg->footer.front_crc),
1325 le32_to_cpu(con->out_msg->footer.middle_crc));
1326 con->out_msg->footer.flags = 0;
1329 con->out_msg->footer.data_crc = 0;
1331 prepare_message_data(con->out_msg, m->data_length);
1332 con->out_more = 1; /* data + footer will follow */
1335 prepare_write_message_footer(con);
1338 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1344 static void prepare_write_ack(struct ceph_connection *con)
1346 dout("prepare_write_ack %p %llu -> %llu\n", con,
1347 con->in_seq_acked, con->in_seq);
1348 con->in_seq_acked = con->in_seq;
1350 con_out_kvec_reset(con);
1352 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
1354 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1355 con_out_kvec_add(con, sizeof (con->out_temp_ack),
1356 &con->out_temp_ack);
1358 con->out_more = 1; /* more will follow.. eventually.. */
1359 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1365 static void prepare_write_seq(struct ceph_connection *con)
1367 dout("prepare_write_seq %p %llu -> %llu\n", con,
1368 con->in_seq_acked, con->in_seq);
1369 con->in_seq_acked = con->in_seq;
1371 con_out_kvec_reset(con);
1373 con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
1374 con_out_kvec_add(con, sizeof (con->out_temp_ack),
1375 &con->out_temp_ack);
1377 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1383 static void prepare_write_keepalive(struct ceph_connection *con)
1385 dout("prepare_write_keepalive %p\n", con);
1386 con_out_kvec_reset(con);
1387 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
1391 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
1392 ceph_encode_timespec64(&con->out_temp_keepalive2, &now);
1393 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
1394 &con->out_temp_keepalive2);
1396 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
1398 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1405 static int get_connect_authorizer(struct ceph_connection *con)
1410 if (!con->ops->get_authorizer) {
1411 con->auth = NULL;
1412 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
1413 con->out_connect.authorizer_len = 0;
1417 auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry);
1421 con->auth = auth;
1422 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
1423 con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len);
1430 static void prepare_write_banner(struct ceph_connection *con)
1432 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
1433 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
1434 &con->msgr->my_enc_addr);
1436 con->out_more = 0;
1437 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1440 static void __prepare_write_connect(struct ceph_connection *con)
1442 con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect);
1443 if (con->auth)
1444 con_out_kvec_add(con, con->auth->authorizer_buf_len,
1445 con->auth->authorizer_buf);
1447 con->out_more = 0;
1448 con_flag_set(con, CON_FLAG_WRITE_PENDING);
1451 static int prepare_write_connect(struct ceph_connection *con)
1453 unsigned int global_seq = get_global_seq(con->msgr, 0);
1457 switch (con->peer_name.type) {
1471 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
1472 con->connect_seq, global_seq, proto);
1474 con->out_connect.features =
1475 cpu_to_le64(from_msgr(con->msgr)->supported_features);
1476 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
1477 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
1478 con->out_connect.global_seq = cpu_to_le32(global_seq);
1479 con->out_connect.protocol_version = cpu_to_le32(proto);
1480 con->out_connect.flags = 0;
1482 ret = get_connect_authorizer(con);
1486 __prepare_write_connect(con);
1496 static int write_partial_kvec(struct ceph_connection *con)
1500 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
1501 while (con->out_kvec_bytes > 0) {
1502 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
1503 con->out_kvec_left, con->out_kvec_bytes,
1504 con->out_more);
1507 con->out_kvec_bytes -= ret;
1508 if (con->out_kvec_bytes == 0)
1512 while (ret >= con->out_kvec_cur->iov_len) {
1513 BUG_ON(!con->out_kvec_left);
1514 ret -= con->out_kvec_cur->iov_len;
1515 con->out_kvec_cur++;
1516 con->out_kvec_left--;
1520 con->out_kvec_cur->iov_len -= ret;
1521 con->out_kvec_cur->iov_base += ret;
1524 con->out_kvec_left = 0;
1527 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
1528 con->out_kvec_bytes, con->out_kvec_left, ret);
1552 static int write_partial_message_data(struct ceph_connection *con)
1554 struct ceph_msg *msg = con->out_msg;
1556 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1560 dout("%s %p msg %p\n", __func__, con, msg);
1588 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
1601 dout("%s %p msg %p done\n", __func__, con, msg);
1608 con_out_kvec_reset(con);
1609 prepare_write_message_footer(con);
1617 static int write_partial_skip(struct ceph_connection *con)
1622 dout("%s %p %d left\n", __func__, con, con->out_skip);
1623 while (con->out_skip > 0) {
1624 size_t size = min(con->out_skip, (int) PAGE_SIZE);
1626 if (size == con->out_skip)
1628 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, more);
1631 con->out_skip -= ret;
1641 static void prepare_read_banner(struct ceph_connection *con)
1643 dout("prepare_read_banner %p\n", con);
1644 con->in_base_pos = 0;
1647 static void prepare_read_connect(struct ceph_connection *con)
1649 dout("prepare_read_connect %p\n", con);
1650 con->in_base_pos = 0;
1653 static void prepare_read_ack(struct ceph_connection *con)
1655 dout("prepare_read_ack %p\n", con);
1656 con->in_base_pos = 0;
1659 static void prepare_read_seq(struct ceph_connection *con)
1661 dout("prepare_read_seq %p\n", con);
1662 con->in_base_pos = 0;
1663 con->in_tag = CEPH_MSGR_TAG_SEQ;
1666 static void prepare_read_tag(struct ceph_connection *con)
1668 dout("prepare_read_tag %p\n", con);
1669 con->in_base_pos = 0;
1670 con->in_tag = CEPH_MSGR_TAG_READY;
1673 static void prepare_read_keepalive_ack(struct ceph_connection *con)
1675 dout("prepare_read_keepalive_ack %p\n", con);
1676 con->in_base_pos = 0;
1682 static int prepare_read_message(struct ceph_connection *con)
1684 dout("prepare_read_message %p\n", con);
1685 BUG_ON(con->in_msg != NULL);
1686 con->in_base_pos = 0;
1687 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
1692 static int read_partial(struct ceph_connection *con,
1695 while (con->in_base_pos < end) {
1696 int left = end - con->in_base_pos;
1698 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
1701 con->in_base_pos += ret;
1710 static int read_partial_banner(struct ceph_connection *con)
1716 dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
1721 ret = read_partial(con, end, size, con->in_banner);
1725 size = sizeof (con->actual_peer_addr);
1727 ret = read_partial(con, end, size, &con->actual_peer_addr);
1730 ceph_decode_banner_addr(&con->actual_peer_addr);
1732 size = sizeof (con->peer_addr_for_me);
1734 ret = read_partial(con, end, size, &con->peer_addr_for_me);
1737 ceph_decode_banner_addr(&con->peer_addr_for_me);
1743 static int read_partial_connect(struct ceph_connection *con)
1749 dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
1751 size = sizeof (con->in_reply);
1753 ret = read_partial(con, end, size, &con->in_reply);
1757 if (con->auth) {
1758 size = le32_to_cpu(con->in_reply.authorizer_len);
1759 if (size > con->auth->authorizer_reply_buf_len) {
1761 con->auth->authorizer_reply_buf_len);
1767 ret = read_partial(con, end, size,
1768 con->auth->authorizer_reply_buf);
1774 con, (int)con->in_reply.tag,
1775 le32_to_cpu(con->in_reply.connect_seq),
1776 le32_to_cpu(con->in_reply.global_seq));
1784 static int verify_hello(struct ceph_connection *con)
1786 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1788 ceph_pr_addr(&con->peer_addr));
1789 con->error_msg = "protocol error, bad banner";
2003 static int process_banner(struct ceph_connection *con)
2005 dout("process_banner on %p\n", con);
2007 if (verify_hello(con) < 0)
2015 if (memcmp(&con->peer_addr, &con->actual_peer_addr,
2016 sizeof(con->peer_addr)) != 0 &&
2017 !(addr_is_blank(&con->actual_peer_addr) &&
2018 con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
2020 ceph_pr_addr(&con->peer_addr),
2021 le32_to_cpu(con->peer_addr.nonce),
2022 ceph_pr_addr(&con->actual_peer_addr),
2023 le32_to_cpu(con->actual_peer_addr.nonce));
2024 con->error_msg = "wrong peer at address";
2031 if (addr_is_blank(&con->msgr->inst.addr)) {
2032 int port = addr_port(&con->msgr->inst.addr);
2034 memcpy(&con->msgr->inst.addr.in_addr,
2035 &con->peer_addr_for_me.in_addr,
2036 sizeof(con->peer_addr_for_me.in_addr));
2037 addr_set_port(&con->msgr->inst.addr, port);
2038 encode_my_addr(con->msgr);
2040 ceph_pr_addr(&con->msgr->inst.addr));
2046 static int process_connect(struct ceph_connection *con)
2048 u64 sup_feat = from_msgr(con->msgr)->supported_features;
2049 u64 req_feat = from_msgr(con->msgr)->required_features;
2050 u64 server_feat = le64_to_cpu(con->in_reply.features);
2053 dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
2055 if (con->auth) {
2056 int len = le32_to_cpu(con->in_reply.authorizer_len);
2065 if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
2066 ret = con->ops->add_authorizer_challenge(
2067 con, con->auth->authorizer_reply_buf, len);
2071 con_out_kvec_reset(con);
2072 __prepare_write_connect(con);
2073 prepare_read_connect(con);
2078 ret = con->ops->verify_authorizer_reply(con);
2080 con->error_msg = "bad authorize reply";
2086 switch (con->in_reply.tag) {
2090 ENTITY_NAME(con->peer_name),
2091 ceph_pr_addr(&con->peer_addr),
2093 con->error_msg = "missing required protocol features";
2094 reset_connection(con);
2100 ENTITY_NAME(con->peer_name),
2101 ceph_pr_addr(&con->peer_addr),
2102 le32_to_cpu(con->out_connect.protocol_version),
2103 le32_to_cpu(con->in_reply.protocol_version));
2104 con->error_msg = "protocol version mismatch";
2105 reset_connection(con);
2109 con->auth_retry++;
2110 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
2111 con->auth_retry);
2112 if (con->auth_retry == 2) {
2113 con->error_msg = "connect authorization failure";
2116 con_out_kvec_reset(con);
2117 ret = prepare_write_connect(con);
2120 prepare_read_connect(con);
2132 le32_to_cpu(con->in_reply.connect_seq));
2134 ENTITY_NAME(con->peer_name),
2135 ceph_pr_addr(&con->peer_addr));
2136 reset_connection(con);
2137 con_out_kvec_reset(con);
2138 ret = prepare_write_connect(con);
2141 prepare_read_connect(con);
2144 mutex_unlock(&con->mutex);
2145 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
2146 if (con->ops->peer_reset)
2147 con->ops->peer_reset(con);
2148 mutex_lock(&con->mutex);
2149 if (con->state != CON_STATE_NEGOTIATING)
2159 le32_to_cpu(con->out_connect.connect_seq),
2160 le32_to_cpu(con->in_reply.connect_seq));
2161 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
2162 con_out_kvec_reset(con);
2163 ret = prepare_write_connect(con);
2166 prepare_read_connect(con);
2175 con->peer_global_seq,
2176 le32_to_cpu(con->in_reply.global_seq));
2177 get_global_seq(con->msgr,
2178 le32_to_cpu(con->in_reply.global_seq));
2179 con_out_kvec_reset(con);
2180 ret = prepare_write_connect(con);
2183 prepare_read_connect(con);
2191 ENTITY_NAME(con->peer_name),
2192 ceph_pr_addr(&con->peer_addr),
2194 con->error_msg = "missing required protocol features";
2195 reset_connection(con);
2199 WARN_ON(con->state != CON_STATE_NEGOTIATING);
2200 con->state = CON_STATE_OPEN;
2201 con->auth_retry = 0; /* we authenticated; clear flag */
2202 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
2203 con->connect_seq++;
2204 con->peer_features = server_feat;
2206 con->peer_global_seq,
2207 le32_to_cpu(con->in_reply.connect_seq),
2208 con->connect_seq);
2209 WARN_ON(con->connect_seq !=
2210 le32_to_cpu(con->in_reply.connect_seq));
2212 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
2213 con_flag_set(con, CON_FLAG_LOSSYTX);
2215 con->delay = 0; /* reset backoff memory */
2217 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
2218 prepare_write_seq(con);
2219 prepare_read_seq(con);
2221 prepare_read_tag(con);
2232 con->error_msg = "protocol error, got WAIT as client";
2236 con->error_msg = "protocol error, garbage tag during connect";
2246 static int read_partial_ack(struct ceph_connection *con)
2248 int size = sizeof (con->in_temp_ack);
2251 return read_partial(con, end, size, &con->in_temp_ack);
2257 static void process_ack(struct ceph_connection *con)
2260 u64 ack = le64_to_cpu(con->in_temp_ack);
2262 bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ);
2263 struct list_head *list = reconnect ? &con->out_queue : &con->out_sent;
2283 prepare_read_tag(con);
2287 static int read_partial_message_section(struct ceph_connection *con,
2298 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
2310 static int read_partial_msg_data(struct ceph_connection *con)
2312 struct ceph_msg *msg = con->in_msg;
2314 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
2325 crc = con->in_data_crc;
2333 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2336 con->in_data_crc = crc;
2346 con->in_data_crc = crc;
2354 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
2356 static int read_partial_message(struct ceph_connection *con)
2358 struct ceph_msg *m = con->in_msg;
2363 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
2364 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
2368 dout("read_partial_message con %p msg %p\n", con, m);
2371 size = sizeof (con->in_hdr);
2373 ret = read_partial(con, end, size, &con->in_hdr);
2377 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
2378 if (cpu_to_le32(crc) != con->in_hdr.crc) {
2380 crc, con->in_hdr.crc);
2384 front_len = le32_to_cpu(con->in_hdr.front_len);
2387 middle_len = le32_to_cpu(con->in_hdr.middle_len);
2390 data_len = le32_to_cpu(con->in_hdr.data_len);
2395 seq = le64_to_cpu(con->in_hdr.seq);
2396 if ((s64)seq - (s64)con->in_seq < 1) {
2398 ENTITY_NAME(con->peer_name),
2399 ceph_pr_addr(&con->peer_addr),
2400 seq, con->in_seq + 1);
2401 con->in_base_pos = -front_len - middle_len - data_len -
2402 sizeof_footer(con);
2403 con->in_tag = CEPH_MSGR_TAG_READY;
2405 } else if ((s64)seq - (s64)con->in_seq > 1) {
2407 seq, con->in_seq + 1);
2408 con->error_msg = "bad message sequence # for incoming message";
2413 if (!con->in_msg) {
2416 dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
2418 ret = ceph_con_in_msg_alloc(con, &skip);
2422 BUG_ON(!con->in_msg ^ skip);
2426 con->in_base_pos = -front_len - middle_len - data_len -
2427 sizeof_footer(con);
2428 con->in_tag = CEPH_MSGR_TAG_READY;
2429 con->in_seq++;
2433 BUG_ON(!con->in_msg);
2434 BUG_ON(con->in_msg->con != con);
2435 m = con->in_msg;
2443 prepare_message_data(con->in_msg, data_len);
2447 ret = read_partial_message_section(con, &m->front, front_len,
2448 &con->in_front_crc);
2454 ret = read_partial_message_section(con, &m->middle->vec,
2456 &con->in_middle_crc);
2463 ret = read_partial_msg_data(con);
2469 size = sizeof_footer(con);
2471 ret = read_partial(con, end, size, &m->footer);
2485 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
2487 m, con->in_front_crc, m->footer.front_crc);
2490 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
2492 m, con->in_middle_crc, m->footer.middle_crc);
2497 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
2499 con->in_data_crc, le32_to_cpu(m->footer.data_crc));
2503 if (need_sign && con->ops->check_message_signature &&
2504 con->ops->check_message_signature(m)) {
2517 static void process_message(struct ceph_connection *con)
2519 struct ceph_msg *msg = con->in_msg;
2521 BUG_ON(con->in_msg->con != con);
2522 con->in_msg = NULL;
2525 if (con->peer_name.type == 0)
2526 con->peer_name = msg->hdr.src;
2528 con->in_seq++;
2529 mutex_unlock(&con->mutex);
2538 con->in_front_crc, con->in_middle_crc, con->in_data_crc);
2539 con->ops->dispatch(con, msg);
2541 mutex_lock(&con->mutex);
2544 static int read_keepalive_ack(struct ceph_connection *con)
2548 int ret = read_partial(con, size, size, &ceph_ts);
2551 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
2552 prepare_read_tag(con);
2560 static int try_write(struct ceph_connection *con)
2564 dout("try_write start %p state %lu\n", con, con->state);
2565 if (con->state != CON_STATE_PREOPEN &&
2566 con->state != CON_STATE_CONNECTING &&
2567 con->state != CON_STATE_NEGOTIATING &&
2568 con->state != CON_STATE_OPEN)
2572 if (con->state == CON_STATE_PREOPEN) {
2573 BUG_ON(con->sock);
2574 con->state = CON_STATE_CONNECTING;
2576 con_out_kvec_reset(con);
2577 prepare_write_banner(con);
2578 prepare_read_banner(con);
2580 BUG_ON(con->in_msg);
2581 con->in_tag = CEPH_MSGR_TAG_READY;
2583 con, con->state);
2584 ret = ceph_tcp_connect(con);
2586 con->error_msg = "connect error";
2592 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
2593 BUG_ON(!con->sock);
2596 if (con->out_kvec_left) {
2597 ret = write_partial_kvec(con);
2601 if (con->out_skip) {
2602 ret = write_partial_skip(con);
2608 if (con->out_msg) {
2609 if (con->out_msg_done) {
2610 ceph_msg_put(con->out_msg);
2611 con->out_msg = NULL; /* we're done with this one */
2615 ret = write_partial_message_data(con);
2628 if (con->state == CON_STATE_OPEN) {
2629 if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
2630 prepare_write_keepalive(con);
2634 if (!list_empty(&con->out_queue)) {
2635 prepare_write_message(con);
2638 if (con->in_seq > con->in_seq_acked) {
2639 prepare_write_ack(con);
2645 con_flag_clear(con, CON_FLAG_WRITE_PENDING);
2649 dout("try_write done on %p ret %d\n", con, ret);
2656 static int try_read(struct ceph_connection *con)
2661 dout("try_read start on %p state %lu\n", con, con->state);
2662 if (con->state != CON_STATE_CONNECTING &&
2663 con->state != CON_STATE_NEGOTIATING &&
2664 con->state != CON_STATE_OPEN)
2667 BUG_ON(!con->sock);
2669 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
2670 con->in_base_pos);
2672 if (con->state == CON_STATE_CONNECTING) {
2674 ret = read_partial_banner(con);
2677 ret = process_banner(con);
2681 con->state = CON_STATE_NEGOTIATING;
2688 ret = prepare_write_connect(con);
2691 prepare_read_connect(con);
2697 if (con->state == CON_STATE_NEGOTIATING) {
2699 ret = read_partial_connect(con);
2702 ret = process_connect(con);
2708 WARN_ON(con->state != CON_STATE_OPEN);
2710 if (con->in_base_pos < 0) {
2714 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
2717 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
2718 con->in_base_pos += ret;
2719 if (con->in_base_pos)
2722 if (con->in_tag == CEPH_MSGR_TAG_READY) {
2726 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
2729 dout("try_read got tag %d\n", (int)con->in_tag);
2730 switch (con->in_tag) {
2732 prepare_read_message(con);
2735 prepare_read_ack(con);
2738 prepare_read_keepalive_ack(con);
2741 con_close_socket(con);
2742 con->state = CON_STATE_CLOSED;
2748 if (con->in_tag == CEPH_MSGR_TAG_MSG) {
2749 ret = read_partial_message(con);
2753 con->error_msg = "bad crc/signature";
2759 con->error_msg = "io error";
2764 if (con->in_tag == CEPH_MSGR_TAG_READY)
2766 process_message(con);
2767 if (con->state == CON_STATE_OPEN)
2768 prepare_read_tag(con);
2771 if (con->in_tag == CEPH_MSGR_TAG_ACK ||
2772 con->in_tag == CEPH_MSGR_TAG_SEQ) {
2777 ret = read_partial_ack(con);
2780 process_ack(con);
2783 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
2784 ret = read_keepalive_ack(con);
2791 dout("try_read done on %p ret %d\n", con, ret);
2795 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
2796 con->error_msg = "protocol error, garbage tag";
2804 * Bump @con reference to avoid races with connection teardown.
2807 static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
2809 if (!con->ops->get(con)) {
2810 dout("%s %p ref count 0\n", __func__, con);
2814 dout("%s %p %lu\n", __func__, con, delay);
2815 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
2816 dout("%s %p - already queued\n", __func__, con);
2817 con->ops->put(con);
2824 static void queue_con(struct ceph_connection *con)
2826 (void) queue_con_delay(con, 0);
2829 static void cancel_con(struct ceph_connection *con)
2831 if (cancel_delayed_work(&con->work)) {
2832 dout("%s %p\n", __func__, con);
2833 con->ops->put(con);
2837 static bool con_sock_closed(struct ceph_connection *con)
2839 if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
2844 con->error_msg = "socket closed (con state " #x ")"; \
2847 switch (con->state) {
2855 pr_warn("%s con %p unrecognized state %lu\n",
2856 __func__, con, con->state);
2857 con->error_msg = "unrecognized con state";
2866 static bool con_backoff(struct ceph_connection *con)
2870 if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
2873 ret = queue_con_delay(con, round_jiffies_relative(con->delay));
2875 dout("%s: con %p FAILED to back off %lu\n", __func__,
2876 con, con->delay);
2878 con_flag_set(con, CON_FLAG_BACKOFF);
2884 /* Finish fault handling; con->mutex must *not* be held here */
2886 static void con_fault_finish(struct ceph_connection *con)
2888 dout("%s %p\n", __func__, con);
2894 if (con->auth_retry) {
2895 dout("auth_retry %d, invalidating\n", con->auth_retry);
2896 if (con->ops->invalidate_authorizer)
2897 con->ops->invalidate_authorizer(con);
2898 con->auth_retry = 0;
2901 if (con->ops->fault)
2902 con->ops->fault(con);
2910 struct ceph_connection *con = container_of(work, struct ceph_connection,
2914 mutex_lock(&con->mutex);
2918 if ((fault = con_sock_closed(con))) {
2919 dout("%s: con %p SOCK_CLOSED\n", __func__, con);
2922 if (con_backoff(con)) {
2923 dout("%s: con %p BACKOFF\n", __func__, con);
2926 if (con->state == CON_STATE_STANDBY) {
2927 dout("%s: con %p STANDBY\n", __func__, con);
2930 if (con->state == CON_STATE_CLOSED) {
2931 dout("%s: con %p CLOSED\n", __func__, con);
2932 BUG_ON(con->sock);
2935 if (con->state == CON_STATE_PREOPEN) {
2936 dout("%s: con %p PREOPEN\n", __func__, con);
2937 BUG_ON(con->sock);
2940 ret = try_read(con);
2944 if (!con->error_msg)
2945 con->error_msg = "socket error on read";
2950 ret = try_write(con);
2954 if (!con->error_msg)
2955 con->error_msg = "socket error on write";
2962 con_fault(con);
2963 mutex_unlock(&con->mutex);
2966 con_fault_finish(con);
2968 con->ops->put(con);
2975 static void con_fault(struct ceph_connection *con)
2978 con, con->state, ceph_pr_addr(&con->peer_addr));
2980 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
2981 ceph_pr_addr(&con->peer_addr), con->error_msg);
2982 con->error_msg = NULL;
2984 WARN_ON(con->state != CON_STATE_CONNECTING &&
2985 con->state != CON_STATE_NEGOTIATING &&
2986 con->state != CON_STATE_OPEN);
2988 con_close_socket(con);
2990 if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
2992 con->state = CON_STATE_CLOSED;
2996 if (con->in_msg) {
2997 BUG_ON(con->in_msg->con != con);
2998 ceph_msg_put(con->in_msg);
2999 con->in_msg = NULL;
3001 if (con->out_msg) {
3002 BUG_ON(con->out_msg->con != con);
3003 ceph_msg_put(con->out_msg);
3004 con->out_msg = NULL;
3008 list_splice_init(&con->out_sent, &con->out_queue);
3012 if (list_empty(&con->out_queue) &&
3013 !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
3014 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
3015 con_flag_clear(con, CON_FLAG_WRITE_PENDING);
3016 con->state = CON_STATE_STANDBY;
3019 con->state = CON_STATE_PREOPEN;
3020 if (con->delay == 0)
3021 con->delay = BASE_DELAY_INTERVAL;
3022 else if (con->delay < MAX_DELAY_INTERVAL)
3023 con->delay *= 2;
3024 con_flag_set(con, CON_FLAG_BACKOFF);
3025 queue_con(con);
3066 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
3068 if (msg->con)
3069 msg->con->ops->put(msg->con);
3071 msg->con = con ? con->ops->get(con) : NULL;
3072 BUG_ON(msg->con != con);
3075 static void clear_standby(struct ceph_connection *con)
3078 if (con->state == CON_STATE_STANDBY) {
3079 dout("clear_standby %p and ++connect_seq\n", con);
3080 con->state = CON_STATE_PREOPEN;
3081 con->connect_seq++;
3082 WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
3083 WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
3090 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
3093 msg->hdr.src = con->msgr->inst.name;
3097 mutex_lock(&con->mutex);
3099 if (con->state == CON_STATE_CLOSED) {
3100 dout("con_send %p closed, dropping %p\n", con, msg);
3102 mutex_unlock(&con->mutex);
3106 msg_con_set(msg, con);
3109 list_add_tail(&msg->list_head, &con->out_queue);
3111 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type),
3117 clear_standby(con);
3118 mutex_unlock(&con->mutex);
3122 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
3123 queue_con(con);
3132 struct ceph_connection *con = msg->con;
3134 if (!con) {
3135 dout("%s msg %p null con\n", __func__, msg);
3139 mutex_lock(&con->mutex);
3141 dout("%s %p msg %p - was on queue\n", __func__, con, msg);
3147 if (con->out_msg == msg) {
3148 BUG_ON(con->out_skip);
3150 if (con->out_msg_done) {
3151 con->out_skip += con_out_kvec_skip(con);
3154 con->out_skip += sizeof_footer(con);
3158 con->out_skip += msg->cursor.total_resid;
3160 con->out_skip += con_out_kvec_skip(con);
3161 con->out_skip += con_out_kvec_skip(con);
3164 __func__, con, msg, con->out_kvec_bytes, con->out_skip);
3166 con->out_msg = NULL;
3170 mutex_unlock(&con->mutex);
3178 struct ceph_connection *con = msg->con;
3180 if (!con) {
3181 dout("%s msg %p null con\n", __func__, msg);
3185 mutex_lock(&con->mutex);
3186 if (con->in_msg == msg) {
3187 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
3188 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
3189 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
3192 dout("%s %p msg %p revoked\n", __func__, con, msg);
3193 con->in_base_pos = con->in_base_pos -
3199 ceph_msg_put(con->in_msg);
3200 con->in_msg = NULL;
3201 con->in_tag = CEPH_MSGR_TAG_READY;
3202 con->in_seq++;
3205 __func__, con, con->in_msg, msg);
3207 mutex_unlock(&con->mutex);
3213 void ceph_con_keepalive(struct ceph_connection *con)
3215 dout("con_keepalive %p\n", con);
3216 mutex_lock(&con->mutex);
3217 clear_standby(con);
3218 con_flag_set(con, CON_FLAG_KEEPALIVE_PENDING);
3219 mutex_unlock(&con->mutex);
3221 if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
3222 queue_con(con);
3226 bool ceph_con_keepalive_expired(struct ceph_connection *con,
3230 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
3235 ts = timespec64_add(con->last_keepalive_ack, ts);
3396 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
3414 * connection, and save the result in con->in_msg. Uses the
3421 * - con->in_msg == NULL
3423 * - con->in_msg is non-null.
3425 * - con->in_msg == NULL
3427 static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
3429 struct ceph_msg_header *hdr = &con->in_hdr;
3434 BUG_ON(con->in_msg != NULL);
3435 BUG_ON(!con->ops->alloc_msg);
3437 mutex_unlock(&con->mutex);
3438 msg = con->ops->alloc_msg(con, hdr, skip);
3439 mutex_lock(&con->mutex);
3440 if (con->state != CON_STATE_OPEN) {
3447 msg_con_set(msg, con);
3448 con->in_msg = msg;
3458 con->error_msg = "error allocating memory for incoming message";
3461 memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
3463 if (middle_len && !con->in_msg->middle) {
3464 ret = ceph_alloc_middle(con, con->in_msg);
3466 ceph_msg_put(con->in_msg);
3467 con->in_msg = NULL;