1// SPDX-License-Identifier: GPL-2.0 2#include <linux/ceph/ceph_debug.h> 3 4#include <linux/module.h> 5#include <linux/types.h> 6#include <linux/slab.h> 7#include <linux/random.h> 8#include <linux/sched.h> 9 10#include <linux/ceph/ceph_features.h> 11#include <linux/ceph/mon_client.h> 12#include <linux/ceph/libceph.h> 13#include <linux/ceph/debugfs.h> 14#include <linux/ceph/decode.h> 15#include <linux/ceph/auth.h> 16 17/* 18 * Interact with Ceph monitor cluster. Handle requests for new map 19 * versions, and periodically resend as needed. Also implement 20 * statfs() and umount(). 21 * 22 * A small cluster of Ceph "monitors" are responsible for managing critical 23 * cluster configuration and state information. An odd number (e.g., 3, 5) 24 * of cmon daemons use a modified version of the Paxos part-time parliament 25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 26 * list of clients who have mounted the file system. 27 * 28 * We maintain an open, active session with a monitor at all times in order to 29 * receive timely MDSMap updates. We periodically send a keepalive byte on the 30 * TCP socket to ensure we detect a failure. If the connection does break, we 31 * randomly hunt for a new monitor. Once the connection is reestablished, we 32 * resend any outstanding requests. 33 */ 34 35static const struct ceph_connection_operations mon_con_ops; 36 37static int __validate_auth(struct ceph_mon_client *monc); 38 39/* 40 * Decode a monmap blob (e.g., during mount). 41 */ 42static struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 43{ 44 struct ceph_monmap *m = NULL; 45 int i, err = -EINVAL; 46 struct ceph_fsid fsid; 47 u32 epoch, num_mon; 48 u32 len; 49 50 ceph_decode_32_safe(&p, end, len, bad); 51 ceph_decode_need(&p, end, len, bad); 52 53 dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p)); 54 p += sizeof(u16); /* skip version */ 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 62 if (num_mon > CEPH_MAX_MON) 63 goto bad; 64 m = kmalloc(struct_size(m, mon_inst, num_mon), GFP_NOFS); 65 if (m == NULL) 66 return ERR_PTR(-ENOMEM); 67 m->fsid = fsid; 68 m->epoch = epoch; 69 m->num_mon = num_mon; 70 for (i = 0; i < num_mon; ++i) { 71 struct ceph_entity_inst *inst = &m->mon_inst[i]; 72 73 /* copy name portion */ 74 ceph_decode_copy_safe(&p, end, &inst->name, 75 sizeof(inst->name), bad); 76 err = ceph_decode_entity_addr(&p, end, &inst->addr); 77 if (err) 78 goto bad; 79 } 80 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 81 m->num_mon); 82 for (i = 0; i < m->num_mon; i++) 83 dout("monmap_decode mon%d is %s\n", i, 84 ceph_pr_addr(&m->mon_inst[i].addr)); 85 return m; 86bad: 87 dout("monmap_decode failed with %d\n", err); 88 kfree(m); 89 return ERR_PTR(err); 90} 91 92/* 93 * return true if *addr is included in the monmap. 94 */ 95int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 96{ 97 int i; 98 99 for (i = 0; i < m->num_mon; i++) { 100 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr)) 101 return 1; 102 } 103 104 return 0; 105} 106 107/* 108 * Send an auth request. 109 */ 110static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 111{ 112 monc->pending_auth = 1; 113 monc->m_auth->front.iov_len = len; 114 monc->m_auth->hdr.front_len = cpu_to_le32(len); 115 ceph_msg_revoke(monc->m_auth); 116 ceph_msg_get(monc->m_auth); /* keep our ref */ 117 ceph_con_send(&monc->con, monc->m_auth); 118} 119 120/* 121 * Close monitor session, if any. 122 */ 123static void __close_session(struct ceph_mon_client *monc) 124{ 125 dout("__close_session closing mon%d\n", monc->cur_mon); 126 ceph_msg_revoke(monc->m_auth); 127 ceph_msg_revoke_incoming(monc->m_auth_reply); 128 ceph_msg_revoke(monc->m_subscribe); 129 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 130 ceph_con_close(&monc->con); 131 132 monc->pending_auth = 0; 133 ceph_auth_reset(monc->auth); 134} 135 136/* 137 * Pick a new monitor at random and set cur_mon. If we are repicking 138 * (i.e. cur_mon is already set), be sure to pick a different one. 139 */ 140static void pick_new_mon(struct ceph_mon_client *monc) 141{ 142 int old_mon = monc->cur_mon; 143 144 BUG_ON(monc->monmap->num_mon < 1); 145 146 if (monc->monmap->num_mon == 1) { 147 monc->cur_mon = 0; 148 } else { 149 int max = monc->monmap->num_mon; 150 int o = -1; 151 int n; 152 153 if (monc->cur_mon >= 0) { 154 if (monc->cur_mon < monc->monmap->num_mon) 155 o = monc->cur_mon; 156 if (o >= 0) 157 max--; 158 } 159 160 n = prandom_u32() % max; 161 if (o >= 0 && n >= o) 162 n++; 163 164 monc->cur_mon = n; 165 } 166 167 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 168 monc->cur_mon, monc->monmap->num_mon); 169} 170 171/* 172 * Open a session with a new monitor. 173 */ 174static void __open_session(struct ceph_mon_client *monc) 175{ 176 int ret; 177 178 pick_new_mon(monc); 179 180 monc->hunting = true; 181 if (monc->had_a_connection) { 182 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 183 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 184 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 185 } 186 187 monc->sub_renew_after = jiffies; /* i.e., expired */ 188 monc->sub_renew_sent = 0; 189 190 dout("%s opening mon%d\n", __func__, monc->cur_mon); 191 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 192 &monc->monmap->mon_inst[monc->cur_mon].addr); 193 194 /* 195 * send an initial keepalive to ensure our timestamp is valid 196 * by the time we are in an OPENED state 197 */ 198 ceph_con_keepalive(&monc->con); 199 200 /* initiate authentication handshake */ 201 ret = ceph_auth_build_hello(monc->auth, 202 monc->m_auth->front.iov_base, 203 monc->m_auth->front_alloc_len); 204 BUG_ON(ret <= 0); 205 __send_prepared_auth_request(monc, ret); 206} 207 208static void reopen_session(struct ceph_mon_client *monc) 209{ 210 if (!monc->hunting) 211 pr_info("mon%d %s session lost, hunting for new mon\n", 212 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr)); 213 214 __close_session(monc); 215 __open_session(monc); 216} 217 218void ceph_monc_reopen_session(struct ceph_mon_client *monc) 219{ 220 mutex_lock(&monc->mutex); 221 reopen_session(monc); 222 mutex_unlock(&monc->mutex); 223} 224 225static void un_backoff(struct ceph_mon_client *monc) 226{ 227 monc->hunt_mult /= 2; /* reduce by 50% */ 228 if (monc->hunt_mult < 1) 229 monc->hunt_mult = 1; 230 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult); 231} 232 233/* 234 * Reschedule delayed work timer. 235 */ 236static void __schedule_delayed(struct ceph_mon_client *monc) 237{ 238 unsigned long delay; 239 240 if (monc->hunting) 241 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 242 else 243 delay = CEPH_MONC_PING_INTERVAL; 244 245 dout("__schedule_delayed after %lu\n", delay); 246 mod_delayed_work(system_wq, &monc->delayed_work, 247 round_jiffies_relative(delay)); 248} 249 250const char *ceph_sub_str[] = { 251 [CEPH_SUB_MONMAP] = "monmap", 252 [CEPH_SUB_OSDMAP] = "osdmap", 253 [CEPH_SUB_FSMAP] = "fsmap.user", 254 [CEPH_SUB_MDSMAP] = "mdsmap", 255}; 256 257/* 258 * Send subscribe request for one or more maps, according to 259 * monc->subs. 260 */ 261static void __send_subscribe(struct ceph_mon_client *monc) 262{ 263 struct ceph_msg *msg = monc->m_subscribe; 264 void *p = msg->front.iov_base; 265 void *const end = p + msg->front_alloc_len; 266 int num = 0; 267 int i; 268 269 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 270 271 BUG_ON(monc->cur_mon < 0); 272 273 if (!monc->sub_renew_sent) 274 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 275 276 msg->hdr.version = cpu_to_le16(2); 277 278 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 279 if (monc->subs[i].want) 280 num++; 281 } 282 BUG_ON(num < 1); /* monmap sub is always there */ 283 ceph_encode_32(&p, num); 284 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 285 char buf[32]; 286 int len; 287 288 if (!monc->subs[i].want) 289 continue; 290 291 len = sprintf(buf, "%s", ceph_sub_str[i]); 292 if (i == CEPH_SUB_MDSMAP && 293 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 294 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 295 296 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 297 le64_to_cpu(monc->subs[i].item.start), 298 monc->subs[i].item.flags); 299 ceph_encode_string(&p, end, buf, len); 300 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 301 p += sizeof(monc->subs[i].item); 302 } 303 304 BUG_ON(p > end); 305 msg->front.iov_len = p - msg->front.iov_base; 306 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 307 ceph_msg_revoke(msg); 308 ceph_con_send(&monc->con, ceph_msg_get(msg)); 309} 310 311static void handle_subscribe_ack(struct ceph_mon_client *monc, 312 struct ceph_msg *msg) 313{ 314 unsigned int seconds; 315 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 316 317 if (msg->front.iov_len < sizeof(*h)) 318 goto bad; 319 seconds = le32_to_cpu(h->duration); 320 321 mutex_lock(&monc->mutex); 322 if (monc->sub_renew_sent) { 323 /* 324 * This is only needed for legacy (infernalis or older) 325 * MONs -- see delayed_work(). 326 */ 327 monc->sub_renew_after = monc->sub_renew_sent + 328 (seconds >> 1) * HZ - 1; 329 dout("%s sent %lu duration %d renew after %lu\n", __func__, 330 monc->sub_renew_sent, seconds, monc->sub_renew_after); 331 monc->sub_renew_sent = 0; 332 } else { 333 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 334 monc->sub_renew_sent, monc->sub_renew_after); 335 } 336 mutex_unlock(&monc->mutex); 337 return; 338bad: 339 pr_err("got corrupt subscribe-ack msg\n"); 340 ceph_msg_dump(msg); 341} 342 343/* 344 * Register interest in a map 345 * 346 * @sub: one of CEPH_SUB_* 347 * @epoch: X for "every map since X", or 0 for "just the latest" 348 */ 349static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 350 u32 epoch, bool continuous) 351{ 352 __le64 start = cpu_to_le64(epoch); 353 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 354 355 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 356 epoch, continuous); 357 358 if (monc->subs[sub].want && 359 monc->subs[sub].item.start == start && 360 monc->subs[sub].item.flags == flags) 361 return false; 362 363 monc->subs[sub].item.start = start; 364 monc->subs[sub].item.flags = flags; 365 monc->subs[sub].want = true; 366 367 return true; 368} 369 370bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 371 bool continuous) 372{ 373 bool need_request; 374 375 mutex_lock(&monc->mutex); 376 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 377 mutex_unlock(&monc->mutex); 378 379 return need_request; 380} 381EXPORT_SYMBOL(ceph_monc_want_map); 382 383/* 384 * Keep track of which maps we have 385 * 386 * @sub: one of CEPH_SUB_* 387 */ 388static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 389 u32 epoch) 390{ 391 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 392 393 if (monc->subs[sub].want) { 394 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 395 monc->subs[sub].want = false; 396 else 397 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 398 } 399 400 monc->subs[sub].have = epoch; 401} 402 403void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 404{ 405 mutex_lock(&monc->mutex); 406 __ceph_monc_got_map(monc, sub, epoch); 407 mutex_unlock(&monc->mutex); 408} 409EXPORT_SYMBOL(ceph_monc_got_map); 410 411void ceph_monc_renew_subs(struct ceph_mon_client *monc) 412{ 413 mutex_lock(&monc->mutex); 414 __send_subscribe(monc); 415 mutex_unlock(&monc->mutex); 416} 417EXPORT_SYMBOL(ceph_monc_renew_subs); 418 419/* 420 * Wait for an osdmap with a given epoch. 421 * 422 * @epoch: epoch to wait for 423 * @timeout: in jiffies, 0 means "wait forever" 424 */ 425int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 426 unsigned long timeout) 427{ 428 unsigned long started = jiffies; 429 long ret; 430 431 mutex_lock(&monc->mutex); 432 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 433 mutex_unlock(&monc->mutex); 434 435 if (timeout && time_after_eq(jiffies, started + timeout)) 436 return -ETIMEDOUT; 437 438 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 439 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 440 ceph_timeout_jiffies(timeout)); 441 if (ret < 0) 442 return ret; 443 444 mutex_lock(&monc->mutex); 445 } 446 447 mutex_unlock(&monc->mutex); 448 return 0; 449} 450EXPORT_SYMBOL(ceph_monc_wait_osdmap); 451 452/* 453 * Open a session with a random monitor. Request monmap and osdmap, 454 * which are waited upon in __ceph_open_session(). 455 */ 456int ceph_monc_open_session(struct ceph_mon_client *monc) 457{ 458 mutex_lock(&monc->mutex); 459 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 460 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 461 __open_session(monc); 462 __schedule_delayed(monc); 463 mutex_unlock(&monc->mutex); 464 return 0; 465} 466EXPORT_SYMBOL(ceph_monc_open_session); 467 468static void ceph_monc_handle_map(struct ceph_mon_client *monc, 469 struct ceph_msg *msg) 470{ 471 struct ceph_client *client = monc->client; 472 struct ceph_monmap *monmap; 473 void *p, *end; 474 475 mutex_lock(&monc->mutex); 476 477 dout("handle_monmap\n"); 478 p = msg->front.iov_base; 479 end = p + msg->front.iov_len; 480 481 monmap = ceph_monmap_decode(p, end); 482 if (IS_ERR(monmap)) { 483 pr_err("problem decoding monmap, %d\n", 484 (int)PTR_ERR(monmap)); 485 ceph_msg_dump(msg); 486 goto out; 487 } 488 489 if (ceph_check_fsid(client, &monmap->fsid) < 0) { 490 kfree(monmap); 491 goto out; 492 } 493 494 kfree(monc->monmap); 495 monc->monmap = monmap; 496 497 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 498 client->have_fsid = true; 499 500out: 501 mutex_unlock(&monc->mutex); 502 wake_up_all(&client->auth_wq); 503} 504 505/* 506 * generic requests (currently statfs, mon_get_version) 507 */ 508DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 509 510static void release_generic_request(struct kref *kref) 511{ 512 struct ceph_mon_generic_request *req = 513 container_of(kref, struct ceph_mon_generic_request, kref); 514 515 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 516 req->reply); 517 WARN_ON(!RB_EMPTY_NODE(&req->node)); 518 519 if (req->reply) 520 ceph_msg_put(req->reply); 521 if (req->request) 522 ceph_msg_put(req->request); 523 524 kfree(req); 525} 526 527static void put_generic_request(struct ceph_mon_generic_request *req) 528{ 529 if (req) 530 kref_put(&req->kref, release_generic_request); 531} 532 533static void get_generic_request(struct ceph_mon_generic_request *req) 534{ 535 kref_get(&req->kref); 536} 537 538static struct ceph_mon_generic_request * 539alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 540{ 541 struct ceph_mon_generic_request *req; 542 543 req = kzalloc(sizeof(*req), gfp); 544 if (!req) 545 return NULL; 546 547 req->monc = monc; 548 kref_init(&req->kref); 549 RB_CLEAR_NODE(&req->node); 550 init_completion(&req->completion); 551 552 dout("%s greq %p\n", __func__, req); 553 return req; 554} 555 556static void register_generic_request(struct ceph_mon_generic_request *req) 557{ 558 struct ceph_mon_client *monc = req->monc; 559 560 WARN_ON(req->tid); 561 562 get_generic_request(req); 563 req->tid = ++monc->last_tid; 564 insert_generic_request(&monc->generic_request_tree, req); 565} 566 567static void send_generic_request(struct ceph_mon_client *monc, 568 struct ceph_mon_generic_request *req) 569{ 570 WARN_ON(!req->tid); 571 572 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 573 req->request->hdr.tid = cpu_to_le64(req->tid); 574 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 575} 576 577static void __finish_generic_request(struct ceph_mon_generic_request *req) 578{ 579 struct ceph_mon_client *monc = req->monc; 580 581 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 582 erase_generic_request(&monc->generic_request_tree, req); 583 584 ceph_msg_revoke(req->request); 585 ceph_msg_revoke_incoming(req->reply); 586} 587 588static void finish_generic_request(struct ceph_mon_generic_request *req) 589{ 590 __finish_generic_request(req); 591 put_generic_request(req); 592} 593 594static void complete_generic_request(struct ceph_mon_generic_request *req) 595{ 596 if (req->complete_cb) 597 req->complete_cb(req); 598 else 599 complete_all(&req->completion); 600 put_generic_request(req); 601} 602 603static void cancel_generic_request(struct ceph_mon_generic_request *req) 604{ 605 struct ceph_mon_client *monc = req->monc; 606 struct ceph_mon_generic_request *lookup_req; 607 608 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 609 610 mutex_lock(&monc->mutex); 611 lookup_req = lookup_generic_request(&monc->generic_request_tree, 612 req->tid); 613 if (lookup_req) { 614 WARN_ON(lookup_req != req); 615 finish_generic_request(req); 616 } 617 618 mutex_unlock(&monc->mutex); 619} 620 621static int wait_generic_request(struct ceph_mon_generic_request *req) 622{ 623 int ret; 624 625 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 626 ret = wait_for_completion_interruptible(&req->completion); 627 if (ret) 628 cancel_generic_request(req); 629 else 630 ret = req->result; /* completed */ 631 632 return ret; 633} 634 635static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 636 struct ceph_msg_header *hdr, 637 int *skip) 638{ 639 struct ceph_mon_client *monc = con->private; 640 struct ceph_mon_generic_request *req; 641 u64 tid = le64_to_cpu(hdr->tid); 642 struct ceph_msg *m; 643 644 mutex_lock(&monc->mutex); 645 req = lookup_generic_request(&monc->generic_request_tree, tid); 646 if (!req) { 647 dout("get_generic_reply %lld dne\n", tid); 648 *skip = 1; 649 m = NULL; 650 } else { 651 dout("get_generic_reply %lld got %p\n", tid, req->reply); 652 *skip = 0; 653 m = ceph_msg_get(req->reply); 654 /* 655 * we don't need to track the connection reading into 656 * this reply because we only have one open connection 657 * at a time, ever. 658 */ 659 } 660 mutex_unlock(&monc->mutex); 661 return m; 662} 663 664/* 665 * statfs 666 */ 667static void handle_statfs_reply(struct ceph_mon_client *monc, 668 struct ceph_msg *msg) 669{ 670 struct ceph_mon_generic_request *req; 671 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 672 u64 tid = le64_to_cpu(msg->hdr.tid); 673 674 dout("%s msg %p tid %llu\n", __func__, msg, tid); 675 676 if (msg->front.iov_len != sizeof(*reply)) 677 goto bad; 678 679 mutex_lock(&monc->mutex); 680 req = lookup_generic_request(&monc->generic_request_tree, tid); 681 if (!req) { 682 mutex_unlock(&monc->mutex); 683 return; 684 } 685 686 req->result = 0; 687 *req->u.st = reply->st; /* struct */ 688 __finish_generic_request(req); 689 mutex_unlock(&monc->mutex); 690 691 complete_generic_request(req); 692 return; 693 694bad: 695 pr_err("corrupt statfs reply, tid %llu\n", tid); 696 ceph_msg_dump(msg); 697} 698 699/* 700 * Do a synchronous statfs(). 701 */ 702int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 703 struct ceph_statfs *buf) 704{ 705 struct ceph_mon_generic_request *req; 706 struct ceph_mon_statfs *h; 707 int ret = -ENOMEM; 708 709 req = alloc_generic_request(monc, GFP_NOFS); 710 if (!req) 711 goto out; 712 713 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 714 true); 715 if (!req->request) 716 goto out; 717 718 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 719 if (!req->reply) 720 goto out; 721 722 req->u.st = buf; 723 req->request->hdr.version = cpu_to_le16(2); 724 725 mutex_lock(&monc->mutex); 726 register_generic_request(req); 727 /* fill out request */ 728 h = req->request->front.iov_base; 729 h->monhdr.have_version = 0; 730 h->monhdr.session_mon = cpu_to_le16(-1); 731 h->monhdr.session_mon_tid = 0; 732 h->fsid = monc->monmap->fsid; 733 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 734 h->data_pool = cpu_to_le64(data_pool); 735 send_generic_request(monc, req); 736 mutex_unlock(&monc->mutex); 737 738 ret = wait_generic_request(req); 739out: 740 put_generic_request(req); 741 return ret; 742} 743EXPORT_SYMBOL(ceph_monc_do_statfs); 744 745static void handle_get_version_reply(struct ceph_mon_client *monc, 746 struct ceph_msg *msg) 747{ 748 struct ceph_mon_generic_request *req; 749 u64 tid = le64_to_cpu(msg->hdr.tid); 750 void *p = msg->front.iov_base; 751 void *end = p + msg->front_alloc_len; 752 u64 handle; 753 754 dout("%s msg %p tid %llu\n", __func__, msg, tid); 755 756 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 757 handle = ceph_decode_64(&p); 758 if (tid != 0 && tid != handle) 759 goto bad; 760 761 mutex_lock(&monc->mutex); 762 req = lookup_generic_request(&monc->generic_request_tree, handle); 763 if (!req) { 764 mutex_unlock(&monc->mutex); 765 return; 766 } 767 768 req->result = 0; 769 req->u.newest = ceph_decode_64(&p); 770 __finish_generic_request(req); 771 mutex_unlock(&monc->mutex); 772 773 complete_generic_request(req); 774 return; 775 776bad: 777 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 778 ceph_msg_dump(msg); 779} 780 781static struct ceph_mon_generic_request * 782__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 783 ceph_monc_callback_t cb, u64 private_data) 784{ 785 struct ceph_mon_generic_request *req; 786 787 req = alloc_generic_request(monc, GFP_NOIO); 788 if (!req) 789 goto err_put_req; 790 791 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 792 sizeof(u64) + sizeof(u32) + strlen(what), 793 GFP_NOIO, true); 794 if (!req->request) 795 goto err_put_req; 796 797 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 798 true); 799 if (!req->reply) 800 goto err_put_req; 801 802 req->complete_cb = cb; 803 req->private_data = private_data; 804 805 mutex_lock(&monc->mutex); 806 register_generic_request(req); 807 { 808 void *p = req->request->front.iov_base; 809 void *const end = p + req->request->front_alloc_len; 810 811 ceph_encode_64(&p, req->tid); /* handle */ 812 ceph_encode_string(&p, end, what, strlen(what)); 813 WARN_ON(p != end); 814 } 815 send_generic_request(monc, req); 816 mutex_unlock(&monc->mutex); 817 818 return req; 819 820err_put_req: 821 put_generic_request(req); 822 return ERR_PTR(-ENOMEM); 823} 824 825/* 826 * Send MMonGetVersion and wait for the reply. 827 * 828 * @what: one of "mdsmap", "osdmap" or "monmap" 829 */ 830int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 831 u64 *newest) 832{ 833 struct ceph_mon_generic_request *req; 834 int ret; 835 836 req = __ceph_monc_get_version(monc, what, NULL, 0); 837 if (IS_ERR(req)) 838 return PTR_ERR(req); 839 840 ret = wait_generic_request(req); 841 if (!ret) 842 *newest = req->u.newest; 843 844 put_generic_request(req); 845 return ret; 846} 847EXPORT_SYMBOL(ceph_monc_get_version); 848 849/* 850 * Send MMonGetVersion, 851 * 852 * @what: one of "mdsmap", "osdmap" or "monmap" 853 */ 854int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 855 ceph_monc_callback_t cb, u64 private_data) 856{ 857 struct ceph_mon_generic_request *req; 858 859 req = __ceph_monc_get_version(monc, what, cb, private_data); 860 if (IS_ERR(req)) 861 return PTR_ERR(req); 862 863 put_generic_request(req); 864 return 0; 865} 866EXPORT_SYMBOL(ceph_monc_get_version_async); 867 868static void handle_command_ack(struct ceph_mon_client *monc, 869 struct ceph_msg *msg) 870{ 871 struct ceph_mon_generic_request *req; 872 void *p = msg->front.iov_base; 873 void *const end = p + msg->front_alloc_len; 874 u64 tid = le64_to_cpu(msg->hdr.tid); 875 876 dout("%s msg %p tid %llu\n", __func__, msg, tid); 877 878 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 879 sizeof(u32), bad); 880 p += sizeof(struct ceph_mon_request_header); 881 882 mutex_lock(&monc->mutex); 883 req = lookup_generic_request(&monc->generic_request_tree, tid); 884 if (!req) { 885 mutex_unlock(&monc->mutex); 886 return; 887 } 888 889 req->result = ceph_decode_32(&p); 890 __finish_generic_request(req); 891 mutex_unlock(&monc->mutex); 892 893 complete_generic_request(req); 894 return; 895 896bad: 897 pr_err("corrupt mon_command ack, tid %llu\n", tid); 898 ceph_msg_dump(msg); 899} 900 901static __printf(2, 0) 902int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt, 903 va_list ap) 904{ 905 struct ceph_mon_generic_request *req; 906 struct ceph_mon_command *h; 907 int ret = -ENOMEM; 908 int len; 909 910 req = alloc_generic_request(monc, GFP_NOIO); 911 if (!req) 912 goto out; 913 914 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 915 if (!req->request) 916 goto out; 917 918 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 919 true); 920 if (!req->reply) 921 goto out; 922 923 mutex_lock(&monc->mutex); 924 register_generic_request(req); 925 h = req->request->front.iov_base; 926 h->monhdr.have_version = 0; 927 h->monhdr.session_mon = cpu_to_le16(-1); 928 h->monhdr.session_mon_tid = 0; 929 h->fsid = monc->monmap->fsid; 930 h->num_strs = cpu_to_le32(1); 931 len = vsprintf(h->str, fmt, ap); 932 h->str_len = cpu_to_le32(len); 933 send_generic_request(monc, req); 934 mutex_unlock(&monc->mutex); 935 936 ret = wait_generic_request(req); 937out: 938 put_generic_request(req); 939 return ret; 940} 941 942static __printf(2, 3) 943int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...) 944{ 945 va_list ap; 946 int ret; 947 948 va_start(ap, fmt); 949 ret = do_mon_command_vargs(monc, fmt, ap); 950 va_end(ap); 951 return ret; 952} 953 954int ceph_monc_blocklist_add(struct ceph_mon_client *monc, 955 struct ceph_entity_addr *client_addr) 956{ 957 int ret; 958 959 ret = do_mon_command(monc, 960 "{ \"prefix\": \"osd blocklist\", \ 961 \"blocklistop\": \"add\", \ 962 \"addr\": \"%pISpc/%u\" }", 963 &client_addr->in_addr, 964 le32_to_cpu(client_addr->nonce)); 965 if (ret == -EINVAL) { 966 /* 967 * The monitor returns EINVAL on an unrecognized command. 968 * Try the legacy command -- it is exactly the same except 969 * for the name. 970 */ 971 ret = do_mon_command(monc, 972 "{ \"prefix\": \"osd blacklist\", \ 973 \"blacklistop\": \"add\", \ 974 \"addr\": \"%pISpc/%u\" }", 975 &client_addr->in_addr, 976 le32_to_cpu(client_addr->nonce)); 977 } 978 if (ret) 979 return ret; 980 981 /* 982 * Make sure we have the osdmap that includes the blocklist 983 * entry. This is needed to ensure that the OSDs pick up the 984 * new blocklist before processing any future requests from 985 * this client. 986 */ 987 return ceph_wait_for_latest_osdmap(monc->client, 0); 988} 989EXPORT_SYMBOL(ceph_monc_blocklist_add); 990 991/* 992 * Resend pending generic requests. 993 */ 994static void __resend_generic_request(struct ceph_mon_client *monc) 995{ 996 struct ceph_mon_generic_request *req; 997 struct rb_node *p; 998 999 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 1000 req = rb_entry(p, struct ceph_mon_generic_request, node); 1001 ceph_msg_revoke(req->request); 1002 ceph_msg_revoke_incoming(req->reply); 1003 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 1004 } 1005} 1006 1007/* 1008 * Delayed work. If we haven't mounted yet, retry. Otherwise, 1009 * renew/retry subscription as needed (in case it is timing out, or we 1010 * got an ENOMEM). And keep the monitor connection alive. 1011 */ 1012static void delayed_work(struct work_struct *work) 1013{ 1014 struct ceph_mon_client *monc = 1015 container_of(work, struct ceph_mon_client, delayed_work.work); 1016 1017 mutex_lock(&monc->mutex); 1018 dout("%s mon%d\n", __func__, monc->cur_mon); 1019 if (monc->cur_mon < 0) { 1020 goto out; 1021 } 1022 1023 if (monc->hunting) { 1024 dout("%s continuing hunt\n", __func__); 1025 reopen_session(monc); 1026 } else { 1027 int is_auth = ceph_auth_is_authenticated(monc->auth); 1028 1029 dout("%s is_authed %d\n", __func__, is_auth); 1030 if (ceph_con_keepalive_expired(&monc->con, 1031 CEPH_MONC_PING_TIMEOUT)) { 1032 dout("monc keepalive timeout\n"); 1033 is_auth = 0; 1034 reopen_session(monc); 1035 } 1036 1037 if (!monc->hunting) { 1038 ceph_con_keepalive(&monc->con); 1039 __validate_auth(monc); 1040 un_backoff(monc); 1041 } 1042 1043 if (is_auth && 1044 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 1045 unsigned long now = jiffies; 1046 1047 dout("%s renew subs? now %lu renew after %lu\n", 1048 __func__, now, monc->sub_renew_after); 1049 if (time_after_eq(now, monc->sub_renew_after)) 1050 __send_subscribe(monc); 1051 } 1052 } 1053 __schedule_delayed(monc); 1054 1055out: 1056 mutex_unlock(&monc->mutex); 1057} 1058 1059/* 1060 * On startup, we build a temporary monmap populated with the IPs 1061 * provided by mount(2). 1062 */ 1063static int build_initial_monmap(struct ceph_mon_client *monc) 1064{ 1065 struct ceph_options *opt = monc->client->options; 1066 struct ceph_entity_addr *mon_addr = opt->mon_addr; 1067 int num_mon = opt->num_mon; 1068 int i; 1069 1070 /* build initial monmap */ 1071 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon), 1072 GFP_KERNEL); 1073 if (!monc->monmap) 1074 return -ENOMEM; 1075 for (i = 0; i < num_mon; i++) { 1076 monc->monmap->mon_inst[i].addr = mon_addr[i]; 1077 monc->monmap->mon_inst[i].addr.nonce = 0; 1078 monc->monmap->mon_inst[i].name.type = 1079 CEPH_ENTITY_TYPE_MON; 1080 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 1081 } 1082 monc->monmap->num_mon = num_mon; 1083 return 0; 1084} 1085 1086int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1087{ 1088 int err = 0; 1089 1090 dout("init\n"); 1091 memset(monc, 0, sizeof(*monc)); 1092 monc->client = cl; 1093 monc->monmap = NULL; 1094 mutex_init(&monc->mutex); 1095 1096 err = build_initial_monmap(monc); 1097 if (err) 1098 goto out; 1099 1100 /* connection */ 1101 /* authentication */ 1102 monc->auth = ceph_auth_init(cl->options->name, 1103 cl->options->key); 1104 if (IS_ERR(monc->auth)) { 1105 err = PTR_ERR(monc->auth); 1106 goto out_monmap; 1107 } 1108 monc->auth->want_keys = 1109 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1110 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1111 1112 /* msgs */ 1113 err = -ENOMEM; 1114 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1115 sizeof(struct ceph_mon_subscribe_ack), 1116 GFP_KERNEL, true); 1117 if (!monc->m_subscribe_ack) 1118 goto out_auth; 1119 1120 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1121 GFP_KERNEL, true); 1122 if (!monc->m_subscribe) 1123 goto out_subscribe_ack; 1124 1125 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1126 GFP_KERNEL, true); 1127 if (!monc->m_auth_reply) 1128 goto out_subscribe; 1129 1130 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1131 monc->pending_auth = 0; 1132 if (!monc->m_auth) 1133 goto out_auth_reply; 1134 1135 ceph_con_init(&monc->con, monc, &mon_con_ops, 1136 &monc->client->msgr); 1137 1138 monc->cur_mon = -1; 1139 monc->had_a_connection = false; 1140 monc->hunt_mult = 1; 1141 1142 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1143 monc->generic_request_tree = RB_ROOT; 1144 monc->last_tid = 0; 1145 1146 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1147 1148 return 0; 1149 1150out_auth_reply: 1151 ceph_msg_put(monc->m_auth_reply); 1152out_subscribe: 1153 ceph_msg_put(monc->m_subscribe); 1154out_subscribe_ack: 1155 ceph_msg_put(monc->m_subscribe_ack); 1156out_auth: 1157 ceph_auth_destroy(monc->auth); 1158out_monmap: 1159 kfree(monc->monmap); 1160out: 1161 return err; 1162} 1163EXPORT_SYMBOL(ceph_monc_init); 1164 1165void ceph_monc_stop(struct ceph_mon_client *monc) 1166{ 1167 dout("stop\n"); 1168 1169 mutex_lock(&monc->mutex); 1170 __close_session(monc); 1171 monc->hunting = false; 1172 monc->cur_mon = -1; 1173 mutex_unlock(&monc->mutex); 1174 1175 cancel_delayed_work_sync(&monc->delayed_work); 1176 1177 /* 1178 * flush msgr queue before we destroy ourselves to ensure that: 1179 * - any work that references our embedded con is finished. 1180 * - any osd_client or other work that may reference an authorizer 1181 * finishes before we shut down the auth subsystem. 1182 */ 1183 ceph_msgr_flush(); 1184 1185 ceph_auth_destroy(monc->auth); 1186 1187 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1188 1189 ceph_msg_put(monc->m_auth); 1190 ceph_msg_put(monc->m_auth_reply); 1191 ceph_msg_put(monc->m_subscribe); 1192 ceph_msg_put(monc->m_subscribe_ack); 1193 1194 kfree(monc->monmap); 1195} 1196EXPORT_SYMBOL(ceph_monc_stop); 1197 1198static void finish_hunting(struct ceph_mon_client *monc) 1199{ 1200 if (monc->hunting) { 1201 dout("%s found mon%d\n", __func__, monc->cur_mon); 1202 monc->hunting = false; 1203 monc->had_a_connection = true; 1204 un_backoff(monc); 1205 __schedule_delayed(monc); 1206 } 1207} 1208 1209static void handle_auth_reply(struct ceph_mon_client *monc, 1210 struct ceph_msg *msg) 1211{ 1212 int ret; 1213 int was_auth = 0; 1214 1215 mutex_lock(&monc->mutex); 1216 was_auth = ceph_auth_is_authenticated(monc->auth); 1217 monc->pending_auth = 0; 1218 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1219 msg->front.iov_len, 1220 monc->m_auth->front.iov_base, 1221 monc->m_auth->front_alloc_len); 1222 if (ret > 0) { 1223 __send_prepared_auth_request(monc, ret); 1224 goto out; 1225 } 1226 1227 finish_hunting(monc); 1228 1229 if (ret < 0) { 1230 monc->client->auth_err = ret; 1231 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1232 dout("authenticated, starting session\n"); 1233 1234 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1235 monc->client->msgr.inst.name.num = 1236 cpu_to_le64(monc->auth->global_id); 1237 1238 __send_subscribe(monc); 1239 __resend_generic_request(monc); 1240 1241 pr_info("mon%d %s session established\n", monc->cur_mon, 1242 ceph_pr_addr(&monc->con.peer_addr)); 1243 } 1244 1245out: 1246 mutex_unlock(&monc->mutex); 1247 if (monc->client->auth_err < 0) 1248 wake_up_all(&monc->client->auth_wq); 1249} 1250 1251static int __validate_auth(struct ceph_mon_client *monc) 1252{ 1253 int ret; 1254 1255 if (monc->pending_auth) 1256 return 0; 1257 1258 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1259 monc->m_auth->front_alloc_len); 1260 if (ret <= 0) 1261 return ret; /* either an error, or no need to authenticate */ 1262 __send_prepared_auth_request(monc, ret); 1263 return 0; 1264} 1265 1266int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1267{ 1268 int ret; 1269 1270 mutex_lock(&monc->mutex); 1271 ret = __validate_auth(monc); 1272 mutex_unlock(&monc->mutex); 1273 return ret; 1274} 1275EXPORT_SYMBOL(ceph_monc_validate_auth); 1276 1277/* 1278 * handle incoming message 1279 */ 1280static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1281{ 1282 struct ceph_mon_client *monc = con->private; 1283 int type = le16_to_cpu(msg->hdr.type); 1284 1285 switch (type) { 1286 case CEPH_MSG_AUTH_REPLY: 1287 handle_auth_reply(monc, msg); 1288 break; 1289 1290 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1291 handle_subscribe_ack(monc, msg); 1292 break; 1293 1294 case CEPH_MSG_STATFS_REPLY: 1295 handle_statfs_reply(monc, msg); 1296 break; 1297 1298 case CEPH_MSG_MON_GET_VERSION_REPLY: 1299 handle_get_version_reply(monc, msg); 1300 break; 1301 1302 case CEPH_MSG_MON_COMMAND_ACK: 1303 handle_command_ack(monc, msg); 1304 break; 1305 1306 case CEPH_MSG_MON_MAP: 1307 ceph_monc_handle_map(monc, msg); 1308 break; 1309 1310 case CEPH_MSG_OSD_MAP: 1311 ceph_osdc_handle_map(&monc->client->osdc, msg); 1312 break; 1313 1314 default: 1315 /* can the chained handler handle it? */ 1316 if (monc->client->extra_mon_dispatch && 1317 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1318 break; 1319 1320 pr_err("received unknown message type %d %s\n", type, 1321 ceph_msg_type_name(type)); 1322 } 1323 ceph_msg_put(msg); 1324} 1325 1326/* 1327 * Allocate memory for incoming message 1328 */ 1329static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1330 struct ceph_msg_header *hdr, 1331 int *skip) 1332{ 1333 struct ceph_mon_client *monc = con->private; 1334 int type = le16_to_cpu(hdr->type); 1335 int front_len = le32_to_cpu(hdr->front_len); 1336 struct ceph_msg *m = NULL; 1337 1338 *skip = 0; 1339 1340 switch (type) { 1341 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1342 m = ceph_msg_get(monc->m_subscribe_ack); 1343 break; 1344 case CEPH_MSG_STATFS_REPLY: 1345 case CEPH_MSG_MON_COMMAND_ACK: 1346 return get_generic_reply(con, hdr, skip); 1347 case CEPH_MSG_AUTH_REPLY: 1348 m = ceph_msg_get(monc->m_auth_reply); 1349 break; 1350 case CEPH_MSG_MON_GET_VERSION_REPLY: 1351 if (le64_to_cpu(hdr->tid) != 0) 1352 return get_generic_reply(con, hdr, skip); 1353 1354 /* 1355 * Older OSDs don't set reply tid even if the orignal 1356 * request had a non-zero tid. Work around this weirdness 1357 * by allocating a new message. 1358 */ 1359 fallthrough; 1360 case CEPH_MSG_MON_MAP: 1361 case CEPH_MSG_MDS_MAP: 1362 case CEPH_MSG_OSD_MAP: 1363 case CEPH_MSG_FS_MAP_USER: 1364 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1365 if (!m) 1366 return NULL; /* ENOMEM--return skip == 0 */ 1367 break; 1368 } 1369 1370 if (!m) { 1371 pr_info("alloc_msg unknown type %d\n", type); 1372 *skip = 1; 1373 } else if (front_len > m->front_alloc_len) { 1374 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1375 front_len, m->front_alloc_len, 1376 (unsigned int)con->peer_name.type, 1377 le64_to_cpu(con->peer_name.num)); 1378 ceph_msg_put(m); 1379 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1380 } 1381 1382 return m; 1383} 1384 1385/* 1386 * If the monitor connection resets, pick a new monitor and resubmit 1387 * any pending requests. 1388 */ 1389static void mon_fault(struct ceph_connection *con) 1390{ 1391 struct ceph_mon_client *monc = con->private; 1392 1393 mutex_lock(&monc->mutex); 1394 dout("%s mon%d\n", __func__, monc->cur_mon); 1395 if (monc->cur_mon >= 0) { 1396 if (!monc->hunting) { 1397 dout("%s hunting for new mon\n", __func__); 1398 reopen_session(monc); 1399 __schedule_delayed(monc); 1400 } else { 1401 dout("%s already hunting\n", __func__); 1402 } 1403 } 1404 mutex_unlock(&monc->mutex); 1405} 1406 1407/* 1408 * We can ignore refcounting on the connection struct, as all references 1409 * will come from the messenger workqueue, which is drained prior to 1410 * mon_client destruction. 1411 */ 1412static struct ceph_connection *con_get(struct ceph_connection *con) 1413{ 1414 return con; 1415} 1416 1417static void con_put(struct ceph_connection *con) 1418{ 1419} 1420 1421static const struct ceph_connection_operations mon_con_ops = { 1422 .get = con_get, 1423 .put = con_put, 1424 .dispatch = dispatch, 1425 .fault = mon_fault, 1426 .alloc_msg = mon_alloc_msg, 1427}; 1428