1// SPDX-License-Identifier: GPL-2.0 2#include <linux/ceph/ceph_debug.h> 3 4#include <linux/fs.h> 5#include <linux/wait.h> 6#include <linux/slab.h> 7#include <linux/gfp.h> 8#include <linux/sched.h> 9#include <linux/debugfs.h> 10#include <linux/seq_file.h> 11#include <linux/ratelimit.h> 12#include <linux/bits.h> 13#include <linux/ktime.h> 14 15#include "super.h" 16#include "mds_client.h" 17 18#include <linux/ceph/ceph_features.h> 19#include <linux/ceph/messenger.h> 20#include <linux/ceph/decode.h> 21#include <linux/ceph/pagelist.h> 22#include <linux/ceph/auth.h> 23#include <linux/ceph/debugfs.h> 24 25#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27/* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58}; 59 60static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62static void ceph_cap_release_work(struct work_struct *work); 63static void ceph_cap_reclaim_work(struct work_struct *work); 64 65static const struct ceph_connection_operations mds_con_ops; 66 67 68/* 69 * mds reply parsing 70 */ 71 72static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74{ 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91bad: 92 return -EIO; 93} 94 95/* 96 * parse individual inode info 97 */ 98static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101{ 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 *p = end; 180 } else { 181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 182 ceph_decode_64_safe(p, end, info->inline_version, bad); 183 ceph_decode_32_safe(p, end, info->inline_len, bad); 184 ceph_decode_need(p, end, info->inline_len, bad); 185 info->inline_data = *p; 186 *p += info->inline_len; 187 } else 188 info->inline_version = CEPH_INLINE_NONE; 189 190 if (features & CEPH_FEATURE_MDS_QUOTA) { 191 err = parse_reply_info_quota(p, end, info); 192 if (err < 0) 193 goto out_bad; 194 } else { 195 info->max_bytes = 0; 196 info->max_files = 0; 197 } 198 199 info->pool_ns_len = 0; 200 info->pool_ns_data = NULL; 201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 203 if (info->pool_ns_len > 0) { 204 ceph_decode_need(p, end, info->pool_ns_len, bad); 205 info->pool_ns_data = *p; 206 *p += info->pool_ns_len; 207 } 208 } 209 210 if (features & CEPH_FEATURE_FS_BTIME) { 211 ceph_decode_need(p, end, sizeof(info->btime), bad); 212 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 213 ceph_decode_64_safe(p, end, info->change_attr, bad); 214 } 215 216 info->dir_pin = -ENODATA; 217 /* info->snap_btime remains zero */ 218 } 219 return 0; 220bad: 221 err = -EIO; 222out_bad: 223 return err; 224} 225 226static int parse_reply_info_dir(void **p, void *end, 227 struct ceph_mds_reply_dirfrag **dirfrag, 228 u64 features) 229{ 230 if (features == (u64)-1) { 231 u8 struct_v, struct_compat; 232 u32 struct_len; 233 ceph_decode_8_safe(p, end, struct_v, bad); 234 ceph_decode_8_safe(p, end, struct_compat, bad); 235 /* struct_v is expected to be >= 1. we only understand 236 * encoding whose struct_compat == 1. */ 237 if (!struct_v || struct_compat != 1) 238 goto bad; 239 ceph_decode_32_safe(p, end, struct_len, bad); 240 ceph_decode_need(p, end, struct_len, bad); 241 end = *p + struct_len; 242 } 243 244 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 245 *dirfrag = *p; 246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 247 if (unlikely(*p > end)) 248 goto bad; 249 if (features == (u64)-1) 250 *p = end; 251 return 0; 252bad: 253 return -EIO; 254} 255 256static int parse_reply_info_lease(void **p, void *end, 257 struct ceph_mds_reply_lease **lease, 258 u64 features) 259{ 260 if (features == (u64)-1) { 261 u8 struct_v, struct_compat; 262 u32 struct_len; 263 ceph_decode_8_safe(p, end, struct_v, bad); 264 ceph_decode_8_safe(p, end, struct_compat, bad); 265 /* struct_v is expected to be >= 1. we only understand 266 * encoding whose struct_compat == 1. */ 267 if (!struct_v || struct_compat != 1) 268 goto bad; 269 ceph_decode_32_safe(p, end, struct_len, bad); 270 ceph_decode_need(p, end, struct_len, bad); 271 end = *p + struct_len; 272 } 273 274 ceph_decode_need(p, end, sizeof(**lease), bad); 275 *lease = *p; 276 *p += sizeof(**lease); 277 if (features == (u64)-1) 278 *p = end; 279 return 0; 280bad: 281 return -EIO; 282} 283 284/* 285 * parse a normal reply, which may contain a (dir+)dentry and/or a 286 * target inode. 287 */ 288static int parse_reply_info_trace(void **p, void *end, 289 struct ceph_mds_reply_info_parsed *info, 290 u64 features) 291{ 292 int err; 293 294 if (info->head->is_dentry) { 295 err = parse_reply_info_in(p, end, &info->diri, features); 296 if (err < 0) 297 goto out_bad; 298 299 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 300 if (err < 0) 301 goto out_bad; 302 303 ceph_decode_32_safe(p, end, info->dname_len, bad); 304 ceph_decode_need(p, end, info->dname_len, bad); 305 info->dname = *p; 306 *p += info->dname_len; 307 308 err = parse_reply_info_lease(p, end, &info->dlease, features); 309 if (err < 0) 310 goto out_bad; 311 } 312 313 if (info->head->is_target) { 314 err = parse_reply_info_in(p, end, &info->targeti, features); 315 if (err < 0) 316 goto out_bad; 317 } 318 319 if (unlikely(*p != end)) 320 goto bad; 321 return 0; 322 323bad: 324 err = -EIO; 325out_bad: 326 pr_err("problem parsing mds trace %d\n", err); 327 return err; 328} 329 330/* 331 * parse readdir results 332 */ 333static int parse_reply_info_readdir(void **p, void *end, 334 struct ceph_mds_reply_info_parsed *info, 335 u64 features) 336{ 337 u32 num, i = 0; 338 int err; 339 340 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 341 if (err < 0) 342 goto out_bad; 343 344 ceph_decode_need(p, end, sizeof(num) + 2, bad); 345 num = ceph_decode_32(p); 346 { 347 u16 flags = ceph_decode_16(p); 348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 352 } 353 if (num == 0) 354 goto done; 355 356 BUG_ON(!info->dir_entries); 357 if ((unsigned long)(info->dir_entries + num) > 358 (unsigned long)info->dir_entries + info->dir_buf_size) { 359 pr_err("dir contents are larger than expected\n"); 360 WARN_ON(1); 361 goto bad; 362 } 363 364 info->dir_nr = num; 365 while (num) { 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 367 /* dentry */ 368 ceph_decode_32_safe(p, end, rde->name_len, bad); 369 ceph_decode_need(p, end, rde->name_len, bad); 370 rde->name = *p; 371 *p += rde->name_len; 372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 373 374 /* dentry lease */ 375 err = parse_reply_info_lease(p, end, &rde->lease, features); 376 if (err) 377 goto out_bad; 378 /* inode */ 379 err = parse_reply_info_in(p, end, &rde->inode, features); 380 if (err < 0) 381 goto out_bad; 382 /* ceph_readdir_prepopulate() will update it */ 383 rde->offset = 0; 384 i++; 385 num--; 386 } 387 388done: 389 /* Skip over any unrecognized fields */ 390 *p = end; 391 return 0; 392 393bad: 394 err = -EIO; 395out_bad: 396 pr_err("problem parsing dir contents %d\n", err); 397 return err; 398} 399 400/* 401 * parse fcntl F_GETLK results 402 */ 403static int parse_reply_info_filelock(void **p, void *end, 404 struct ceph_mds_reply_info_parsed *info, 405 u64 features) 406{ 407 if (*p + sizeof(*info->filelock_reply) > end) 408 goto bad; 409 410 info->filelock_reply = *p; 411 412 /* Skip over any unrecognized fields */ 413 *p = end; 414 return 0; 415bad: 416 return -EIO; 417} 418 419 420#if BITS_PER_LONG == 64 421 422#define DELEGATED_INO_AVAILABLE xa_mk_value(1) 423 424static int ceph_parse_deleg_inos(void **p, void *end, 425 struct ceph_mds_session *s) 426{ 427 u32 sets; 428 429 ceph_decode_32_safe(p, end, sets, bad); 430 dout("got %u sets of delegated inodes\n", sets); 431 while (sets--) { 432 u64 start, len, ino; 433 434 ceph_decode_64_safe(p, end, start, bad); 435 ceph_decode_64_safe(p, end, len, bad); 436 437 /* Don't accept a delegation of system inodes */ 438 if (start < CEPH_INO_SYSTEM_BASE) { 439 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 440 start, len); 441 continue; 442 } 443 while (len--) { 444 int err = xa_insert(&s->s_delegated_inos, ino = start++, 445 DELEGATED_INO_AVAILABLE, 446 GFP_KERNEL); 447 if (!err) { 448 dout("added delegated inode 0x%llx\n", 449 start - 1); 450 } else if (err == -EBUSY) { 451 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 452 start - 1); 453 } else { 454 return err; 455 } 456 } 457 } 458 return 0; 459bad: 460 return -EIO; 461} 462 463u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 464{ 465 unsigned long ino; 466 void *val; 467 468 xa_for_each(&s->s_delegated_inos, ino, val) { 469 val = xa_erase(&s->s_delegated_inos, ino); 470 if (val == DELEGATED_INO_AVAILABLE) 471 return ino; 472 } 473 return 0; 474} 475 476int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 477{ 478 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 479 GFP_KERNEL); 480} 481#else /* BITS_PER_LONG == 64 */ 482/* 483 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 484 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 485 * and bottom words? 486 */ 487static int ceph_parse_deleg_inos(void **p, void *end, 488 struct ceph_mds_session *s) 489{ 490 u32 sets; 491 492 ceph_decode_32_safe(p, end, sets, bad); 493 if (sets) 494 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 495 return 0; 496bad: 497 return -EIO; 498} 499 500u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 501{ 502 return 0; 503} 504 505int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 506{ 507 return 0; 508} 509#endif /* BITS_PER_LONG == 64 */ 510 511/* 512 * parse create results 513 */ 514static int parse_reply_info_create(void **p, void *end, 515 struct ceph_mds_reply_info_parsed *info, 516 u64 features, struct ceph_mds_session *s) 517{ 518 int ret; 519 520 if (features == (u64)-1 || 521 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 522 if (*p == end) { 523 /* Malformed reply? */ 524 info->has_create_ino = false; 525 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 526 u8 struct_v, struct_compat; 527 u32 len; 528 529 info->has_create_ino = true; 530 ceph_decode_8_safe(p, end, struct_v, bad); 531 ceph_decode_8_safe(p, end, struct_compat, bad); 532 ceph_decode_32_safe(p, end, len, bad); 533 ceph_decode_64_safe(p, end, info->ino, bad); 534 ret = ceph_parse_deleg_inos(p, end, s); 535 if (ret) 536 return ret; 537 } else { 538 /* legacy */ 539 ceph_decode_64_safe(p, end, info->ino, bad); 540 info->has_create_ino = true; 541 } 542 } else { 543 if (*p != end) 544 goto bad; 545 } 546 547 /* Skip over any unrecognized fields */ 548 *p = end; 549 return 0; 550bad: 551 return -EIO; 552} 553 554/* 555 * parse extra results 556 */ 557static int parse_reply_info_extra(void **p, void *end, 558 struct ceph_mds_reply_info_parsed *info, 559 u64 features, struct ceph_mds_session *s) 560{ 561 u32 op = le32_to_cpu(info->head->op); 562 563 if (op == CEPH_MDS_OP_GETFILELOCK) 564 return parse_reply_info_filelock(p, end, info, features); 565 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 566 return parse_reply_info_readdir(p, end, info, features); 567 else if (op == CEPH_MDS_OP_CREATE) 568 return parse_reply_info_create(p, end, info, features, s); 569 else 570 return -EIO; 571} 572 573/* 574 * parse entire mds reply 575 */ 576static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 577 struct ceph_mds_reply_info_parsed *info, 578 u64 features) 579{ 580 void *p, *end; 581 u32 len; 582 int err; 583 584 info->head = msg->front.iov_base; 585 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 586 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 587 588 /* trace */ 589 ceph_decode_32_safe(&p, end, len, bad); 590 if (len > 0) { 591 ceph_decode_need(&p, end, len, bad); 592 err = parse_reply_info_trace(&p, p+len, info, features); 593 if (err < 0) 594 goto out_bad; 595 } 596 597 /* extra */ 598 ceph_decode_32_safe(&p, end, len, bad); 599 if (len > 0) { 600 ceph_decode_need(&p, end, len, bad); 601 err = parse_reply_info_extra(&p, p+len, info, features, s); 602 if (err < 0) 603 goto out_bad; 604 } 605 606 /* snap blob */ 607 ceph_decode_32_safe(&p, end, len, bad); 608 info->snapblob_len = len; 609 info->snapblob = p; 610 p += len; 611 612 if (p != end) 613 goto bad; 614 return 0; 615 616bad: 617 err = -EIO; 618out_bad: 619 pr_err("mds parse_reply err %d\n", err); 620 return err; 621} 622 623static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 624{ 625 if (!info->dir_entries) 626 return; 627 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 628} 629 630 631/* 632 * sessions 633 */ 634const char *ceph_session_state_name(int s) 635{ 636 switch (s) { 637 case CEPH_MDS_SESSION_NEW: return "new"; 638 case CEPH_MDS_SESSION_OPENING: return "opening"; 639 case CEPH_MDS_SESSION_OPEN: return "open"; 640 case CEPH_MDS_SESSION_HUNG: return "hung"; 641 case CEPH_MDS_SESSION_CLOSING: return "closing"; 642 case CEPH_MDS_SESSION_CLOSED: return "closed"; 643 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 644 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 645 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 646 default: return "???"; 647 } 648} 649 650struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 651{ 652 if (refcount_inc_not_zero(&s->s_ref)) { 653 dout("mdsc get_session %p %d -> %d\n", s, 654 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 655 return s; 656 } else { 657 dout("mdsc get_session %p 0 -- FAIL\n", s); 658 return NULL; 659 } 660} 661 662void ceph_put_mds_session(struct ceph_mds_session *s) 663{ 664 if (IS_ERR_OR_NULL(s)) 665 return; 666 667 dout("mdsc put_session %p %d -> %d\n", s, 668 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 669 if (refcount_dec_and_test(&s->s_ref)) { 670 if (s->s_auth.authorizer) 671 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 672 WARN_ON(mutex_is_locked(&s->s_mutex)); 673 xa_destroy(&s->s_delegated_inos); 674 kfree(s); 675 } 676} 677 678/* 679 * called under mdsc->mutex 680 */ 681struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 682 int mds) 683{ 684 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 685 return NULL; 686 return ceph_get_mds_session(mdsc->sessions[mds]); 687} 688 689static bool __have_session(struct ceph_mds_client *mdsc, int mds) 690{ 691 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 692 return false; 693 else 694 return true; 695} 696 697static int __verify_registered_session(struct ceph_mds_client *mdsc, 698 struct ceph_mds_session *s) 699{ 700 if (s->s_mds >= mdsc->max_sessions || 701 mdsc->sessions[s->s_mds] != s) 702 return -ENOENT; 703 return 0; 704} 705 706/* 707 * create+register a new session for given mds. 708 * called under mdsc->mutex. 709 */ 710static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 711 int mds) 712{ 713 struct ceph_mds_session *s; 714 715 if (mds >= mdsc->mdsmap->possible_max_rank) 716 return ERR_PTR(-EINVAL); 717 718 s = kzalloc(sizeof(*s), GFP_NOFS); 719 if (!s) 720 return ERR_PTR(-ENOMEM); 721 722 if (mds >= mdsc->max_sessions) { 723 int newmax = 1 << get_count_order(mds + 1); 724 struct ceph_mds_session **sa; 725 726 dout("%s: realloc to %d\n", __func__, newmax); 727 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 728 if (!sa) 729 goto fail_realloc; 730 if (mdsc->sessions) { 731 memcpy(sa, mdsc->sessions, 732 mdsc->max_sessions * sizeof(void *)); 733 kfree(mdsc->sessions); 734 } 735 mdsc->sessions = sa; 736 mdsc->max_sessions = newmax; 737 } 738 739 dout("%s: mds%d\n", __func__, mds); 740 s->s_mdsc = mdsc; 741 s->s_mds = mds; 742 s->s_state = CEPH_MDS_SESSION_NEW; 743 s->s_ttl = 0; 744 s->s_seq = 0; 745 mutex_init(&s->s_mutex); 746 747 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 748 749 spin_lock_init(&s->s_gen_ttl_lock); 750 s->s_cap_gen = 1; 751 s->s_cap_ttl = jiffies - 1; 752 753 spin_lock_init(&s->s_cap_lock); 754 s->s_renew_requested = 0; 755 s->s_renew_seq = 0; 756 INIT_LIST_HEAD(&s->s_caps); 757 s->s_nr_caps = 0; 758 refcount_set(&s->s_ref, 1); 759 INIT_LIST_HEAD(&s->s_waiting); 760 INIT_LIST_HEAD(&s->s_unsafe); 761 xa_init(&s->s_delegated_inos); 762 s->s_num_cap_releases = 0; 763 s->s_cap_reconnect = 0; 764 s->s_cap_iterator = NULL; 765 INIT_LIST_HEAD(&s->s_cap_releases); 766 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 767 768 INIT_LIST_HEAD(&s->s_cap_dirty); 769 INIT_LIST_HEAD(&s->s_cap_flushing); 770 771 mdsc->sessions[mds] = s; 772 atomic_inc(&mdsc->num_sessions); 773 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 774 775 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 776 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 777 778 return s; 779 780fail_realloc: 781 kfree(s); 782 return ERR_PTR(-ENOMEM); 783} 784 785/* 786 * called under mdsc->mutex 787 */ 788static void __unregister_session(struct ceph_mds_client *mdsc, 789 struct ceph_mds_session *s) 790{ 791 dout("__unregister_session mds%d %p\n", s->s_mds, s); 792 BUG_ON(mdsc->sessions[s->s_mds] != s); 793 mdsc->sessions[s->s_mds] = NULL; 794 ceph_con_close(&s->s_con); 795 ceph_put_mds_session(s); 796 atomic_dec(&mdsc->num_sessions); 797} 798 799/* 800 * drop session refs in request. 801 * 802 * should be last request ref, or hold mdsc->mutex 803 */ 804static void put_request_session(struct ceph_mds_request *req) 805{ 806 if (req->r_session) { 807 ceph_put_mds_session(req->r_session); 808 req->r_session = NULL; 809 } 810} 811 812void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 813 void (*cb)(struct ceph_mds_session *), 814 bool check_state) 815{ 816 int mds; 817 818 mutex_lock(&mdsc->mutex); 819 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 820 struct ceph_mds_session *s; 821 822 s = __ceph_lookup_mds_session(mdsc, mds); 823 if (!s) 824 continue; 825 826 if (check_state && !check_session_state(s)) { 827 ceph_put_mds_session(s); 828 continue; 829 } 830 831 mutex_unlock(&mdsc->mutex); 832 cb(s); 833 ceph_put_mds_session(s); 834 mutex_lock(&mdsc->mutex); 835 } 836 mutex_unlock(&mdsc->mutex); 837} 838 839void ceph_mdsc_release_request(struct kref *kref) 840{ 841 struct ceph_mds_request *req = container_of(kref, 842 struct ceph_mds_request, 843 r_kref); 844 ceph_mdsc_release_dir_caps_no_check(req); 845 destroy_reply_info(&req->r_reply_info); 846 if (req->r_request) 847 ceph_msg_put(req->r_request); 848 if (req->r_reply) 849 ceph_msg_put(req->r_reply); 850 if (req->r_inode) { 851 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 852 /* avoid calling iput_final() in mds dispatch threads */ 853 ceph_async_iput(req->r_inode); 854 } 855 if (req->r_parent) { 856 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 857 ceph_async_iput(req->r_parent); 858 } 859 ceph_async_iput(req->r_target_inode); 860 if (req->r_dentry) 861 dput(req->r_dentry); 862 if (req->r_old_dentry) 863 dput(req->r_old_dentry); 864 if (req->r_old_dentry_dir) { 865 /* 866 * track (and drop pins for) r_old_dentry_dir 867 * separately, since r_old_dentry's d_parent may have 868 * changed between the dir mutex being dropped and 869 * this request being freed. 870 */ 871 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 872 CEPH_CAP_PIN); 873 ceph_async_iput(req->r_old_dentry_dir); 874 } 875 kfree(req->r_path1); 876 kfree(req->r_path2); 877 if (req->r_pagelist) 878 ceph_pagelist_release(req->r_pagelist); 879 put_request_session(req); 880 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 881 WARN_ON_ONCE(!list_empty(&req->r_wait)); 882 kmem_cache_free(ceph_mds_request_cachep, req); 883} 884 885DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 886 887/* 888 * lookup session, bump ref if found. 889 * 890 * called under mdsc->mutex. 891 */ 892static struct ceph_mds_request * 893lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 894{ 895 struct ceph_mds_request *req; 896 897 req = lookup_request(&mdsc->request_tree, tid); 898 if (req) 899 ceph_mdsc_get_request(req); 900 901 return req; 902} 903 904/* 905 * Register an in-flight request, and assign a tid. Link to directory 906 * are modifying (if any). 907 * 908 * Called under mdsc->mutex. 909 */ 910static void __register_request(struct ceph_mds_client *mdsc, 911 struct ceph_mds_request *req, 912 struct inode *dir) 913{ 914 int ret = 0; 915 916 req->r_tid = ++mdsc->last_tid; 917 if (req->r_num_caps) { 918 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 919 req->r_num_caps); 920 if (ret < 0) { 921 pr_err("__register_request %p " 922 "failed to reserve caps: %d\n", req, ret); 923 /* set req->r_err to fail early from __do_request */ 924 req->r_err = ret; 925 return; 926 } 927 } 928 dout("__register_request %p tid %lld\n", req, req->r_tid); 929 ceph_mdsc_get_request(req); 930 insert_request(&mdsc->request_tree, req); 931 932 req->r_uid = current_fsuid(); 933 req->r_gid = current_fsgid(); 934 935 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 936 mdsc->oldest_tid = req->r_tid; 937 938 if (dir) { 939 struct ceph_inode_info *ci = ceph_inode(dir); 940 941 ihold(dir); 942 req->r_unsafe_dir = dir; 943 spin_lock(&ci->i_unsafe_lock); 944 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 945 spin_unlock(&ci->i_unsafe_lock); 946 } 947} 948 949static void __unregister_request(struct ceph_mds_client *mdsc, 950 struct ceph_mds_request *req) 951{ 952 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 953 954 /* Never leave an unregistered request on an unsafe list! */ 955 list_del_init(&req->r_unsafe_item); 956 957 if (req->r_tid == mdsc->oldest_tid) { 958 struct rb_node *p = rb_next(&req->r_node); 959 mdsc->oldest_tid = 0; 960 while (p) { 961 struct ceph_mds_request *next_req = 962 rb_entry(p, struct ceph_mds_request, r_node); 963 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 964 mdsc->oldest_tid = next_req->r_tid; 965 break; 966 } 967 p = rb_next(p); 968 } 969 } 970 971 erase_request(&mdsc->request_tree, req); 972 973 if (req->r_unsafe_dir) { 974 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 975 spin_lock(&ci->i_unsafe_lock); 976 list_del_init(&req->r_unsafe_dir_item); 977 spin_unlock(&ci->i_unsafe_lock); 978 } 979 if (req->r_target_inode && 980 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 981 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 982 spin_lock(&ci->i_unsafe_lock); 983 list_del_init(&req->r_unsafe_target_item); 984 spin_unlock(&ci->i_unsafe_lock); 985 } 986 987 if (req->r_unsafe_dir) { 988 /* avoid calling iput_final() in mds dispatch threads */ 989 ceph_async_iput(req->r_unsafe_dir); 990 req->r_unsafe_dir = NULL; 991 } 992 993 complete_all(&req->r_safe_completion); 994 995 ceph_mdsc_put_request(req); 996} 997 998/* 999 * Walk back up the dentry tree until we hit a dentry representing a 1000 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1001 * when calling this) to ensure that the objects won't disappear while we're 1002 * working with them. Once we hit a candidate dentry, we attempt to take a 1003 * reference to it, and return that as the result. 1004 */ 1005static struct inode *get_nonsnap_parent(struct dentry *dentry) 1006{ 1007 struct inode *inode = NULL; 1008 1009 while (dentry && !IS_ROOT(dentry)) { 1010 inode = d_inode_rcu(dentry); 1011 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1012 break; 1013 dentry = dentry->d_parent; 1014 } 1015 if (inode) 1016 inode = igrab(inode); 1017 return inode; 1018} 1019 1020/* 1021 * Choose mds to send request to next. If there is a hint set in the 1022 * request (e.g., due to a prior forward hint from the mds), use that. 1023 * Otherwise, consult frag tree and/or caps to identify the 1024 * appropriate mds. If all else fails, choose randomly. 1025 * 1026 * Called under mdsc->mutex. 1027 */ 1028static int __choose_mds(struct ceph_mds_client *mdsc, 1029 struct ceph_mds_request *req, 1030 bool *random) 1031{ 1032 struct inode *inode; 1033 struct ceph_inode_info *ci; 1034 struct ceph_cap *cap; 1035 int mode = req->r_direct_mode; 1036 int mds = -1; 1037 u32 hash = req->r_direct_hash; 1038 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1039 1040 if (random) 1041 *random = false; 1042 1043 /* 1044 * is there a specific mds we should try? ignore hint if we have 1045 * no session and the mds is not up (active or recovering). 1046 */ 1047 if (req->r_resend_mds >= 0 && 1048 (__have_session(mdsc, req->r_resend_mds) || 1049 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1050 dout("%s using resend_mds mds%d\n", __func__, 1051 req->r_resend_mds); 1052 return req->r_resend_mds; 1053 } 1054 1055 if (mode == USE_RANDOM_MDS) 1056 goto random; 1057 1058 inode = NULL; 1059 if (req->r_inode) { 1060 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1061 inode = req->r_inode; 1062 ihold(inode); 1063 } else { 1064 /* req->r_dentry is non-null for LSSNAP request */ 1065 rcu_read_lock(); 1066 inode = get_nonsnap_parent(req->r_dentry); 1067 rcu_read_unlock(); 1068 dout("%s using snapdir's parent %p\n", __func__, inode); 1069 } 1070 } else if (req->r_dentry) { 1071 /* ignore race with rename; old or new d_parent is okay */ 1072 struct dentry *parent; 1073 struct inode *dir; 1074 1075 rcu_read_lock(); 1076 parent = READ_ONCE(req->r_dentry->d_parent); 1077 dir = req->r_parent ? : d_inode_rcu(parent); 1078 1079 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1080 /* not this fs or parent went negative */ 1081 inode = d_inode(req->r_dentry); 1082 if (inode) 1083 ihold(inode); 1084 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1085 /* direct snapped/virtual snapdir requests 1086 * based on parent dir inode */ 1087 inode = get_nonsnap_parent(parent); 1088 dout("%s using nonsnap parent %p\n", __func__, inode); 1089 } else { 1090 /* dentry target */ 1091 inode = d_inode(req->r_dentry); 1092 if (!inode || mode == USE_AUTH_MDS) { 1093 /* dir + name */ 1094 inode = igrab(dir); 1095 hash = ceph_dentry_hash(dir, req->r_dentry); 1096 is_hash = true; 1097 } else { 1098 ihold(inode); 1099 } 1100 } 1101 rcu_read_unlock(); 1102 } 1103 1104 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1105 hash, mode); 1106 if (!inode) 1107 goto random; 1108 ci = ceph_inode(inode); 1109 1110 if (is_hash && S_ISDIR(inode->i_mode)) { 1111 struct ceph_inode_frag frag; 1112 int found; 1113 1114 ceph_choose_frag(ci, hash, &frag, &found); 1115 if (found) { 1116 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1117 u8 r; 1118 1119 /* choose a random replica */ 1120 get_random_bytes(&r, 1); 1121 r %= frag.ndist; 1122 mds = frag.dist[r]; 1123 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1124 __func__, inode, ceph_vinop(inode), 1125 frag.frag, mds, (int)r, frag.ndist); 1126 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1127 CEPH_MDS_STATE_ACTIVE && 1128 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1129 goto out; 1130 } 1131 1132 /* since this file/dir wasn't known to be 1133 * replicated, then we want to look for the 1134 * authoritative mds. */ 1135 if (frag.mds >= 0) { 1136 /* choose auth mds */ 1137 mds = frag.mds; 1138 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1139 __func__, inode, ceph_vinop(inode), 1140 frag.frag, mds); 1141 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1142 CEPH_MDS_STATE_ACTIVE) { 1143 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1144 mds)) 1145 goto out; 1146 } 1147 } 1148 mode = USE_AUTH_MDS; 1149 } 1150 } 1151 1152 spin_lock(&ci->i_ceph_lock); 1153 cap = NULL; 1154 if (mode == USE_AUTH_MDS) 1155 cap = ci->i_auth_cap; 1156 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1157 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1158 if (!cap) { 1159 spin_unlock(&ci->i_ceph_lock); 1160 ceph_async_iput(inode); 1161 goto random; 1162 } 1163 mds = cap->session->s_mds; 1164 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1165 inode, ceph_vinop(inode), mds, 1166 cap == ci->i_auth_cap ? "auth " : "", cap); 1167 spin_unlock(&ci->i_ceph_lock); 1168out: 1169 /* avoid calling iput_final() while holding mdsc->mutex or 1170 * in mds dispatch threads */ 1171 ceph_async_iput(inode); 1172 return mds; 1173 1174random: 1175 if (random) 1176 *random = true; 1177 1178 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1179 dout("%s chose random mds%d\n", __func__, mds); 1180 return mds; 1181} 1182 1183 1184/* 1185 * session messages 1186 */ 1187struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1188{ 1189 struct ceph_msg *msg; 1190 struct ceph_mds_session_head *h; 1191 1192 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1193 false); 1194 if (!msg) { 1195 pr_err("ENOMEM creating session %s msg\n", 1196 ceph_session_op_name(op)); 1197 return NULL; 1198 } 1199 h = msg->front.iov_base; 1200 h->op = cpu_to_le32(op); 1201 h->seq = cpu_to_le64(seq); 1202 1203 return msg; 1204} 1205 1206static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1207#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1208static int encode_supported_features(void **p, void *end) 1209{ 1210 static const size_t count = ARRAY_SIZE(feature_bits); 1211 1212 if (count > 0) { 1213 size_t i; 1214 size_t size = FEATURE_BYTES(count); 1215 unsigned long bit; 1216 1217 if (WARN_ON_ONCE(*p + 4 + size > end)) 1218 return -ERANGE; 1219 1220 ceph_encode_32(p, size); 1221 memset(*p, 0, size); 1222 for (i = 0; i < count; i++) { 1223 bit = feature_bits[i]; 1224 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1225 } 1226 *p += size; 1227 } else { 1228 if (WARN_ON_ONCE(*p + 4 > end)) 1229 return -ERANGE; 1230 1231 ceph_encode_32(p, 0); 1232 } 1233 1234 return 0; 1235} 1236 1237static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1238#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1239static int encode_metric_spec(void **p, void *end) 1240{ 1241 static const size_t count = ARRAY_SIZE(metric_bits); 1242 1243 /* header */ 1244 if (WARN_ON_ONCE(*p + 2 > end)) 1245 return -ERANGE; 1246 1247 ceph_encode_8(p, 1); /* version */ 1248 ceph_encode_8(p, 1); /* compat */ 1249 1250 if (count > 0) { 1251 size_t i; 1252 size_t size = METRIC_BYTES(count); 1253 1254 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1255 return -ERANGE; 1256 1257 /* metric spec info length */ 1258 ceph_encode_32(p, 4 + size); 1259 1260 /* metric spec */ 1261 ceph_encode_32(p, size); 1262 memset(*p, 0, size); 1263 for (i = 0; i < count; i++) 1264 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1265 *p += size; 1266 } else { 1267 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1268 return -ERANGE; 1269 1270 /* metric spec info length */ 1271 ceph_encode_32(p, 4); 1272 /* metric spec */ 1273 ceph_encode_32(p, 0); 1274 } 1275 1276 return 0; 1277} 1278 1279/* 1280 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1281 * to include additional client metadata fields. 1282 */ 1283static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1284{ 1285 struct ceph_msg *msg; 1286 struct ceph_mds_session_head *h; 1287 int i = -1; 1288 int extra_bytes = 0; 1289 int metadata_key_count = 0; 1290 struct ceph_options *opt = mdsc->fsc->client->options; 1291 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1292 size_t size, count; 1293 void *p, *end; 1294 int ret; 1295 1296 const char* metadata[][2] = { 1297 {"hostname", mdsc->nodename}, 1298 {"kernel_version", init_utsname()->release}, 1299 {"entity_id", opt->name ? : ""}, 1300 {"root", fsopt->server_path ? : "/"}, 1301 {NULL, NULL} 1302 }; 1303 1304 /* Calculate serialized length of metadata */ 1305 extra_bytes = 4; /* map length */ 1306 for (i = 0; metadata[i][0]; ++i) { 1307 extra_bytes += 8 + strlen(metadata[i][0]) + 1308 strlen(metadata[i][1]); 1309 metadata_key_count++; 1310 } 1311 1312 /* supported feature */ 1313 size = 0; 1314 count = ARRAY_SIZE(feature_bits); 1315 if (count > 0) 1316 size = FEATURE_BYTES(count); 1317 extra_bytes += 4 + size; 1318 1319 /* metric spec */ 1320 size = 0; 1321 count = ARRAY_SIZE(metric_bits); 1322 if (count > 0) 1323 size = METRIC_BYTES(count); 1324 extra_bytes += 2 + 4 + 4 + size; 1325 1326 /* Allocate the message */ 1327 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1328 GFP_NOFS, false); 1329 if (!msg) { 1330 pr_err("ENOMEM creating session open msg\n"); 1331 return ERR_PTR(-ENOMEM); 1332 } 1333 p = msg->front.iov_base; 1334 end = p + msg->front.iov_len; 1335 1336 h = p; 1337 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1338 h->seq = cpu_to_le64(seq); 1339 1340 /* 1341 * Serialize client metadata into waiting buffer space, using 1342 * the format that userspace expects for map<string, string> 1343 * 1344 * ClientSession messages with metadata are v4 1345 */ 1346 msg->hdr.version = cpu_to_le16(4); 1347 msg->hdr.compat_version = cpu_to_le16(1); 1348 1349 /* The write pointer, following the session_head structure */ 1350 p += sizeof(*h); 1351 1352 /* Number of entries in the map */ 1353 ceph_encode_32(&p, metadata_key_count); 1354 1355 /* Two length-prefixed strings for each entry in the map */ 1356 for (i = 0; metadata[i][0]; ++i) { 1357 size_t const key_len = strlen(metadata[i][0]); 1358 size_t const val_len = strlen(metadata[i][1]); 1359 1360 ceph_encode_32(&p, key_len); 1361 memcpy(p, metadata[i][0], key_len); 1362 p += key_len; 1363 ceph_encode_32(&p, val_len); 1364 memcpy(p, metadata[i][1], val_len); 1365 p += val_len; 1366 } 1367 1368 ret = encode_supported_features(&p, end); 1369 if (ret) { 1370 pr_err("encode_supported_features failed!\n"); 1371 ceph_msg_put(msg); 1372 return ERR_PTR(ret); 1373 } 1374 1375 ret = encode_metric_spec(&p, end); 1376 if (ret) { 1377 pr_err("encode_metric_spec failed!\n"); 1378 ceph_msg_put(msg); 1379 return ERR_PTR(ret); 1380 } 1381 1382 msg->front.iov_len = p - msg->front.iov_base; 1383 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1384 1385 return msg; 1386} 1387 1388/* 1389 * send session open request. 1390 * 1391 * called under mdsc->mutex 1392 */ 1393static int __open_session(struct ceph_mds_client *mdsc, 1394 struct ceph_mds_session *session) 1395{ 1396 struct ceph_msg *msg; 1397 int mstate; 1398 int mds = session->s_mds; 1399 1400 /* wait for mds to go active? */ 1401 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1402 dout("open_session to mds%d (%s)\n", mds, 1403 ceph_mds_state_name(mstate)); 1404 session->s_state = CEPH_MDS_SESSION_OPENING; 1405 session->s_renew_requested = jiffies; 1406 1407 /* send connect message */ 1408 msg = create_session_open_msg(mdsc, session->s_seq); 1409 if (IS_ERR(msg)) 1410 return PTR_ERR(msg); 1411 ceph_con_send(&session->s_con, msg); 1412 return 0; 1413} 1414 1415/* 1416 * open sessions for any export targets for the given mds 1417 * 1418 * called under mdsc->mutex 1419 */ 1420static struct ceph_mds_session * 1421__open_export_target_session(struct ceph_mds_client *mdsc, int target) 1422{ 1423 struct ceph_mds_session *session; 1424 int ret; 1425 1426 session = __ceph_lookup_mds_session(mdsc, target); 1427 if (!session) { 1428 session = register_session(mdsc, target); 1429 if (IS_ERR(session)) 1430 return session; 1431 } 1432 if (session->s_state == CEPH_MDS_SESSION_NEW || 1433 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1434 ret = __open_session(mdsc, session); 1435 if (ret) 1436 return ERR_PTR(ret); 1437 } 1438 1439 return session; 1440} 1441 1442struct ceph_mds_session * 1443ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1444{ 1445 struct ceph_mds_session *session; 1446 1447 dout("open_export_target_session to mds%d\n", target); 1448 1449 mutex_lock(&mdsc->mutex); 1450 session = __open_export_target_session(mdsc, target); 1451 mutex_unlock(&mdsc->mutex); 1452 1453 return session; 1454} 1455 1456static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1457 struct ceph_mds_session *session) 1458{ 1459 struct ceph_mds_info *mi; 1460 struct ceph_mds_session *ts; 1461 int i, mds = session->s_mds; 1462 1463 if (mds >= mdsc->mdsmap->possible_max_rank) 1464 return; 1465 1466 mi = &mdsc->mdsmap->m_info[mds]; 1467 dout("open_export_target_sessions for mds%d (%d targets)\n", 1468 session->s_mds, mi->num_export_targets); 1469 1470 for (i = 0; i < mi->num_export_targets; i++) { 1471 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1472 ceph_put_mds_session(ts); 1473 } 1474} 1475 1476void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1477 struct ceph_mds_session *session) 1478{ 1479 mutex_lock(&mdsc->mutex); 1480 __open_export_target_sessions(mdsc, session); 1481 mutex_unlock(&mdsc->mutex); 1482} 1483 1484/* 1485 * session caps 1486 */ 1487 1488static void detach_cap_releases(struct ceph_mds_session *session, 1489 struct list_head *target) 1490{ 1491 lockdep_assert_held(&session->s_cap_lock); 1492 1493 list_splice_init(&session->s_cap_releases, target); 1494 session->s_num_cap_releases = 0; 1495 dout("dispose_cap_releases mds%d\n", session->s_mds); 1496} 1497 1498static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1499 struct list_head *dispose) 1500{ 1501 while (!list_empty(dispose)) { 1502 struct ceph_cap *cap; 1503 /* zero out the in-progress message */ 1504 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1505 list_del(&cap->session_caps); 1506 ceph_put_cap(mdsc, cap); 1507 } 1508} 1509 1510static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1511 struct ceph_mds_session *session) 1512{ 1513 struct ceph_mds_request *req; 1514 struct rb_node *p; 1515 1516 dout("cleanup_session_requests mds%d\n", session->s_mds); 1517 mutex_lock(&mdsc->mutex); 1518 while (!list_empty(&session->s_unsafe)) { 1519 req = list_first_entry(&session->s_unsafe, 1520 struct ceph_mds_request, r_unsafe_item); 1521 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1522 req->r_tid); 1523 if (req->r_target_inode) 1524 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1525 if (req->r_unsafe_dir) 1526 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1527 __unregister_request(mdsc, req); 1528 } 1529 /* zero r_attempts, so kick_requests() will re-send requests */ 1530 p = rb_first(&mdsc->request_tree); 1531 while (p) { 1532 req = rb_entry(p, struct ceph_mds_request, r_node); 1533 p = rb_next(p); 1534 if (req->r_session && 1535 req->r_session->s_mds == session->s_mds) 1536 req->r_attempts = 0; 1537 } 1538 mutex_unlock(&mdsc->mutex); 1539} 1540 1541/* 1542 * Helper to safely iterate over all caps associated with a session, with 1543 * special care taken to handle a racing __ceph_remove_cap(). 1544 * 1545 * Caller must hold session s_mutex. 1546 */ 1547int ceph_iterate_session_caps(struct ceph_mds_session *session, 1548 int (*cb)(struct inode *, struct ceph_cap *, 1549 void *), void *arg) 1550{ 1551 struct list_head *p; 1552 struct ceph_cap *cap; 1553 struct inode *inode, *last_inode = NULL; 1554 struct ceph_cap *old_cap = NULL; 1555 int ret; 1556 1557 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1558 spin_lock(&session->s_cap_lock); 1559 p = session->s_caps.next; 1560 while (p != &session->s_caps) { 1561 cap = list_entry(p, struct ceph_cap, session_caps); 1562 inode = igrab(&cap->ci->vfs_inode); 1563 if (!inode) { 1564 p = p->next; 1565 continue; 1566 } 1567 session->s_cap_iterator = cap; 1568 spin_unlock(&session->s_cap_lock); 1569 1570 if (last_inode) { 1571 /* avoid calling iput_final() while holding 1572 * s_mutex or in mds dispatch threads */ 1573 ceph_async_iput(last_inode); 1574 last_inode = NULL; 1575 } 1576 if (old_cap) { 1577 ceph_put_cap(session->s_mdsc, old_cap); 1578 old_cap = NULL; 1579 } 1580 1581 ret = cb(inode, cap, arg); 1582 last_inode = inode; 1583 1584 spin_lock(&session->s_cap_lock); 1585 p = p->next; 1586 if (!cap->ci) { 1587 dout("iterate_session_caps finishing cap %p removal\n", 1588 cap); 1589 BUG_ON(cap->session != session); 1590 cap->session = NULL; 1591 list_del_init(&cap->session_caps); 1592 session->s_nr_caps--; 1593 atomic64_dec(&session->s_mdsc->metric.total_caps); 1594 if (cap->queue_release) 1595 __ceph_queue_cap_release(session, cap); 1596 else 1597 old_cap = cap; /* put_cap it w/o locks held */ 1598 } 1599 if (ret < 0) 1600 goto out; 1601 } 1602 ret = 0; 1603out: 1604 session->s_cap_iterator = NULL; 1605 spin_unlock(&session->s_cap_lock); 1606 1607 ceph_async_iput(last_inode); 1608 if (old_cap) 1609 ceph_put_cap(session->s_mdsc, old_cap); 1610 1611 return ret; 1612} 1613 1614static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode) 1615{ 1616 struct ceph_inode_info *ci = ceph_inode(inode); 1617 struct ceph_cap_snap *capsnap; 1618 int capsnap_release = 0; 1619 1620 lockdep_assert_held(&ci->i_ceph_lock); 1621 1622 dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode); 1623 1624 while (!list_empty(&ci->i_cap_snaps)) { 1625 capsnap = list_first_entry(&ci->i_cap_snaps, 1626 struct ceph_cap_snap, ci_item); 1627 __ceph_remove_capsnap(inode, capsnap, NULL, NULL); 1628 ceph_put_snap_context(capsnap->context); 1629 ceph_put_cap_snap(capsnap); 1630 capsnap_release++; 1631 } 1632 wake_up_all(&ci->i_cap_wq); 1633 wake_up_all(&mdsc->cap_flushing_wq); 1634 return capsnap_release; 1635} 1636 1637static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1638 void *arg) 1639{ 1640 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1641 struct ceph_mds_client *mdsc = fsc->mdsc; 1642 struct ceph_inode_info *ci = ceph_inode(inode); 1643 LIST_HEAD(to_remove); 1644 bool dirty_dropped = false; 1645 bool invalidate = false; 1646 int capsnap_release = 0; 1647 1648 dout("removing cap %p, ci is %p, inode is %p\n", 1649 cap, ci, &ci->vfs_inode); 1650 spin_lock(&ci->i_ceph_lock); 1651 __ceph_remove_cap(cap, false); 1652 if (!ci->i_auth_cap) { 1653 struct ceph_cap_flush *cf; 1654 1655 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1656 if (inode->i_data.nrpages > 0) 1657 invalidate = true; 1658 if (ci->i_wrbuffer_ref > 0) 1659 mapping_set_error(&inode->i_data, -EIO); 1660 } 1661 1662 while (!list_empty(&ci->i_cap_flush_list)) { 1663 cf = list_first_entry(&ci->i_cap_flush_list, 1664 struct ceph_cap_flush, i_list); 1665 list_move(&cf->i_list, &to_remove); 1666 } 1667 1668 spin_lock(&mdsc->cap_dirty_lock); 1669 1670 list_for_each_entry(cf, &to_remove, i_list) 1671 list_del_init(&cf->g_list); 1672 1673 if (!list_empty(&ci->i_dirty_item)) { 1674 pr_warn_ratelimited( 1675 " dropping dirty %s state for %p %lld\n", 1676 ceph_cap_string(ci->i_dirty_caps), 1677 inode, ceph_ino(inode)); 1678 ci->i_dirty_caps = 0; 1679 list_del_init(&ci->i_dirty_item); 1680 dirty_dropped = true; 1681 } 1682 if (!list_empty(&ci->i_flushing_item)) { 1683 pr_warn_ratelimited( 1684 " dropping dirty+flushing %s state for %p %lld\n", 1685 ceph_cap_string(ci->i_flushing_caps), 1686 inode, ceph_ino(inode)); 1687 ci->i_flushing_caps = 0; 1688 list_del_init(&ci->i_flushing_item); 1689 mdsc->num_cap_flushing--; 1690 dirty_dropped = true; 1691 } 1692 spin_unlock(&mdsc->cap_dirty_lock); 1693 1694 if (dirty_dropped) { 1695 mapping_set_error(inode->i_mapping, -EIO); 1696 1697 if (ci->i_wrbuffer_ref_head == 0 && 1698 ci->i_wr_ref == 0 && 1699 ci->i_dirty_caps == 0 && 1700 ci->i_flushing_caps == 0) { 1701 ceph_put_snap_context(ci->i_head_snapc); 1702 ci->i_head_snapc = NULL; 1703 } 1704 } 1705 1706 if (atomic_read(&ci->i_filelock_ref) > 0) { 1707 /* make further file lock syscall return -EIO */ 1708 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1709 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1710 inode, ceph_ino(inode)); 1711 } 1712 1713 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1714 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1715 ci->i_prealloc_cap_flush = NULL; 1716 } 1717 1718 if (!list_empty(&ci->i_cap_snaps)) 1719 capsnap_release = remove_capsnaps(mdsc, inode); 1720 } 1721 spin_unlock(&ci->i_ceph_lock); 1722 while (!list_empty(&to_remove)) { 1723 struct ceph_cap_flush *cf; 1724 cf = list_first_entry(&to_remove, 1725 struct ceph_cap_flush, i_list); 1726 list_del_init(&cf->i_list); 1727 if (!cf->is_capsnap) 1728 ceph_free_cap_flush(cf); 1729 } 1730 1731 wake_up_all(&ci->i_cap_wq); 1732 if (invalidate) 1733 ceph_queue_invalidate(inode); 1734 if (dirty_dropped) 1735 iput(inode); 1736 while (capsnap_release--) 1737 iput(inode); 1738 return 0; 1739} 1740 1741/* 1742 * caller must hold session s_mutex 1743 */ 1744static void remove_session_caps(struct ceph_mds_session *session) 1745{ 1746 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1747 struct super_block *sb = fsc->sb; 1748 LIST_HEAD(dispose); 1749 1750 dout("remove_session_caps on %p\n", session); 1751 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1752 1753 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1754 1755 spin_lock(&session->s_cap_lock); 1756 if (session->s_nr_caps > 0) { 1757 struct inode *inode; 1758 struct ceph_cap *cap, *prev = NULL; 1759 struct ceph_vino vino; 1760 /* 1761 * iterate_session_caps() skips inodes that are being 1762 * deleted, we need to wait until deletions are complete. 1763 * __wait_on_freeing_inode() is designed for the job, 1764 * but it is not exported, so use lookup inode function 1765 * to access it. 1766 */ 1767 while (!list_empty(&session->s_caps)) { 1768 cap = list_entry(session->s_caps.next, 1769 struct ceph_cap, session_caps); 1770 if (cap == prev) 1771 break; 1772 prev = cap; 1773 vino = cap->ci->i_vino; 1774 spin_unlock(&session->s_cap_lock); 1775 1776 inode = ceph_find_inode(sb, vino); 1777 /* avoid calling iput_final() while holding s_mutex */ 1778 ceph_async_iput(inode); 1779 1780 spin_lock(&session->s_cap_lock); 1781 } 1782 } 1783 1784 // drop cap expires and unlock s_cap_lock 1785 detach_cap_releases(session, &dispose); 1786 1787 BUG_ON(session->s_nr_caps > 0); 1788 BUG_ON(!list_empty(&session->s_cap_flushing)); 1789 spin_unlock(&session->s_cap_lock); 1790 dispose_cap_releases(session->s_mdsc, &dispose); 1791} 1792 1793enum { 1794 RECONNECT, 1795 RENEWCAPS, 1796 FORCE_RO, 1797}; 1798 1799/* 1800 * wake up any threads waiting on this session's caps. if the cap is 1801 * old (didn't get renewed on the client reconnect), remove it now. 1802 * 1803 * caller must hold s_mutex. 1804 */ 1805static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1806 void *arg) 1807{ 1808 struct ceph_inode_info *ci = ceph_inode(inode); 1809 unsigned long ev = (unsigned long)arg; 1810 1811 if (ev == RECONNECT) { 1812 spin_lock(&ci->i_ceph_lock); 1813 ci->i_wanted_max_size = 0; 1814 ci->i_requested_max_size = 0; 1815 spin_unlock(&ci->i_ceph_lock); 1816 } else if (ev == RENEWCAPS) { 1817 if (cap->cap_gen < cap->session->s_cap_gen) { 1818 /* mds did not re-issue stale cap */ 1819 spin_lock(&ci->i_ceph_lock); 1820 cap->issued = cap->implemented = CEPH_CAP_PIN; 1821 spin_unlock(&ci->i_ceph_lock); 1822 } 1823 } else if (ev == FORCE_RO) { 1824 } 1825 wake_up_all(&ci->i_cap_wq); 1826 return 0; 1827} 1828 1829static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1830{ 1831 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1832 ceph_iterate_session_caps(session, wake_up_session_cb, 1833 (void *)(unsigned long)ev); 1834} 1835 1836/* 1837 * Send periodic message to MDS renewing all currently held caps. The 1838 * ack will reset the expiration for all caps from this session. 1839 * 1840 * caller holds s_mutex 1841 */ 1842static int send_renew_caps(struct ceph_mds_client *mdsc, 1843 struct ceph_mds_session *session) 1844{ 1845 struct ceph_msg *msg; 1846 int state; 1847 1848 if (time_after_eq(jiffies, session->s_cap_ttl) && 1849 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1850 pr_info("mds%d caps stale\n", session->s_mds); 1851 session->s_renew_requested = jiffies; 1852 1853 /* do not try to renew caps until a recovering mds has reconnected 1854 * with its clients. */ 1855 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1856 if (state < CEPH_MDS_STATE_RECONNECT) { 1857 dout("send_renew_caps ignoring mds%d (%s)\n", 1858 session->s_mds, ceph_mds_state_name(state)); 1859 return 0; 1860 } 1861 1862 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1863 ceph_mds_state_name(state)); 1864 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1865 ++session->s_renew_seq); 1866 if (!msg) 1867 return -ENOMEM; 1868 ceph_con_send(&session->s_con, msg); 1869 return 0; 1870} 1871 1872static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1873 struct ceph_mds_session *session, u64 seq) 1874{ 1875 struct ceph_msg *msg; 1876 1877 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1878 session->s_mds, ceph_session_state_name(session->s_state), seq); 1879 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1880 if (!msg) 1881 return -ENOMEM; 1882 ceph_con_send(&session->s_con, msg); 1883 return 0; 1884} 1885 1886 1887/* 1888 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1889 * 1890 * Called under session->s_mutex 1891 */ 1892static void renewed_caps(struct ceph_mds_client *mdsc, 1893 struct ceph_mds_session *session, int is_renew) 1894{ 1895 int was_stale; 1896 int wake = 0; 1897 1898 spin_lock(&session->s_cap_lock); 1899 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1900 1901 session->s_cap_ttl = session->s_renew_requested + 1902 mdsc->mdsmap->m_session_timeout*HZ; 1903 1904 if (was_stale) { 1905 if (time_before(jiffies, session->s_cap_ttl)) { 1906 pr_info("mds%d caps renewed\n", session->s_mds); 1907 wake = 1; 1908 } else { 1909 pr_info("mds%d caps still stale\n", session->s_mds); 1910 } 1911 } 1912 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1913 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1914 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1915 spin_unlock(&session->s_cap_lock); 1916 1917 if (wake) 1918 wake_up_session_caps(session, RENEWCAPS); 1919} 1920 1921/* 1922 * send a session close request 1923 */ 1924static int request_close_session(struct ceph_mds_session *session) 1925{ 1926 struct ceph_msg *msg; 1927 1928 dout("request_close_session mds%d state %s seq %lld\n", 1929 session->s_mds, ceph_session_state_name(session->s_state), 1930 session->s_seq); 1931 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 1932 session->s_seq); 1933 if (!msg) 1934 return -ENOMEM; 1935 ceph_con_send(&session->s_con, msg); 1936 return 1; 1937} 1938 1939/* 1940 * Called with s_mutex held. 1941 */ 1942static int __close_session(struct ceph_mds_client *mdsc, 1943 struct ceph_mds_session *session) 1944{ 1945 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1946 return 0; 1947 session->s_state = CEPH_MDS_SESSION_CLOSING; 1948 return request_close_session(session); 1949} 1950 1951static bool drop_negative_children(struct dentry *dentry) 1952{ 1953 struct dentry *child; 1954 bool all_negative = true; 1955 1956 if (!d_is_dir(dentry)) 1957 goto out; 1958 1959 spin_lock(&dentry->d_lock); 1960 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1961 if (d_really_is_positive(child)) { 1962 all_negative = false; 1963 break; 1964 } 1965 } 1966 spin_unlock(&dentry->d_lock); 1967 1968 if (all_negative) 1969 shrink_dcache_parent(dentry); 1970out: 1971 return all_negative; 1972} 1973 1974/* 1975 * Trim old(er) caps. 1976 * 1977 * Because we can't cache an inode without one or more caps, we do 1978 * this indirectly: if a cap is unused, we prune its aliases, at which 1979 * point the inode will hopefully get dropped to. 1980 * 1981 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1982 * memory pressure from the MDS, though, so it needn't be perfect. 1983 */ 1984static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1985{ 1986 int *remaining = arg; 1987 struct ceph_inode_info *ci = ceph_inode(inode); 1988 int used, wanted, oissued, mine; 1989 1990 if (*remaining <= 0) 1991 return -1; 1992 1993 spin_lock(&ci->i_ceph_lock); 1994 mine = cap->issued | cap->implemented; 1995 used = __ceph_caps_used(ci); 1996 wanted = __ceph_caps_file_wanted(ci); 1997 oissued = __ceph_caps_issued_other(ci, cap); 1998 1999 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 2000 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 2001 ceph_cap_string(used), ceph_cap_string(wanted)); 2002 if (cap == ci->i_auth_cap) { 2003 if (ci->i_dirty_caps || ci->i_flushing_caps || 2004 !list_empty(&ci->i_cap_snaps)) 2005 goto out; 2006 if ((used | wanted) & CEPH_CAP_ANY_WR) 2007 goto out; 2008 /* Note: it's possible that i_filelock_ref becomes non-zero 2009 * after dropping auth caps. It doesn't hurt because reply 2010 * of lock mds request will re-add auth caps. */ 2011 if (atomic_read(&ci->i_filelock_ref) > 0) 2012 goto out; 2013 } 2014 /* The inode has cached pages, but it's no longer used. 2015 * we can safely drop it */ 2016 if (S_ISREG(inode->i_mode) && 2017 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2018 !(oissued & CEPH_CAP_FILE_CACHE)) { 2019 used = 0; 2020 oissued = 0; 2021 } 2022 if ((used | wanted) & ~oissued & mine) 2023 goto out; /* we need these caps */ 2024 2025 if (oissued) { 2026 /* we aren't the only cap.. just remove us */ 2027 __ceph_remove_cap(cap, true); 2028 (*remaining)--; 2029 } else { 2030 struct dentry *dentry; 2031 /* try dropping referring dentries */ 2032 spin_unlock(&ci->i_ceph_lock); 2033 dentry = d_find_any_alias(inode); 2034 if (dentry && drop_negative_children(dentry)) { 2035 int count; 2036 dput(dentry); 2037 d_prune_aliases(inode); 2038 count = atomic_read(&inode->i_count); 2039 if (count == 1) 2040 (*remaining)--; 2041 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 2042 inode, cap, count); 2043 } else { 2044 dput(dentry); 2045 } 2046 return 0; 2047 } 2048 2049out: 2050 spin_unlock(&ci->i_ceph_lock); 2051 return 0; 2052} 2053 2054/* 2055 * Trim session cap count down to some max number. 2056 */ 2057int ceph_trim_caps(struct ceph_mds_client *mdsc, 2058 struct ceph_mds_session *session, 2059 int max_caps) 2060{ 2061 int trim_caps = session->s_nr_caps - max_caps; 2062 2063 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2064 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2065 if (trim_caps > 0) { 2066 int remaining = trim_caps; 2067 2068 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2069 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2070 session->s_mds, session->s_nr_caps, max_caps, 2071 trim_caps - remaining); 2072 } 2073 2074 ceph_flush_cap_releases(mdsc, session); 2075 return 0; 2076} 2077 2078static int check_caps_flush(struct ceph_mds_client *mdsc, 2079 u64 want_flush_tid) 2080{ 2081 int ret = 1; 2082 2083 spin_lock(&mdsc->cap_dirty_lock); 2084 if (!list_empty(&mdsc->cap_flush_list)) { 2085 struct ceph_cap_flush *cf = 2086 list_first_entry(&mdsc->cap_flush_list, 2087 struct ceph_cap_flush, g_list); 2088 if (cf->tid <= want_flush_tid) { 2089 dout("check_caps_flush still flushing tid " 2090 "%llu <= %llu\n", cf->tid, want_flush_tid); 2091 ret = 0; 2092 } 2093 } 2094 spin_unlock(&mdsc->cap_dirty_lock); 2095 return ret; 2096} 2097 2098/* 2099 * flush all dirty inode data to disk. 2100 * 2101 * returns true if we've flushed through want_flush_tid 2102 */ 2103static void wait_caps_flush(struct ceph_mds_client *mdsc, 2104 u64 want_flush_tid) 2105{ 2106 dout("check_caps_flush want %llu\n", want_flush_tid); 2107 2108 wait_event(mdsc->cap_flushing_wq, 2109 check_caps_flush(mdsc, want_flush_tid)); 2110 2111 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2112} 2113 2114/* 2115 * called under s_mutex 2116 */ 2117static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2118 struct ceph_mds_session *session) 2119{ 2120 struct ceph_msg *msg = NULL; 2121 struct ceph_mds_cap_release *head; 2122 struct ceph_mds_cap_item *item; 2123 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2124 struct ceph_cap *cap; 2125 LIST_HEAD(tmp_list); 2126 int num_cap_releases; 2127 __le32 barrier, *cap_barrier; 2128 2129 down_read(&osdc->lock); 2130 barrier = cpu_to_le32(osdc->epoch_barrier); 2131 up_read(&osdc->lock); 2132 2133 spin_lock(&session->s_cap_lock); 2134again: 2135 list_splice_init(&session->s_cap_releases, &tmp_list); 2136 num_cap_releases = session->s_num_cap_releases; 2137 session->s_num_cap_releases = 0; 2138 spin_unlock(&session->s_cap_lock); 2139 2140 while (!list_empty(&tmp_list)) { 2141 if (!msg) { 2142 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2143 PAGE_SIZE, GFP_NOFS, false); 2144 if (!msg) 2145 goto out_err; 2146 head = msg->front.iov_base; 2147 head->num = cpu_to_le32(0); 2148 msg->front.iov_len = sizeof(*head); 2149 2150 msg->hdr.version = cpu_to_le16(2); 2151 msg->hdr.compat_version = cpu_to_le16(1); 2152 } 2153 2154 cap = list_first_entry(&tmp_list, struct ceph_cap, 2155 session_caps); 2156 list_del(&cap->session_caps); 2157 num_cap_releases--; 2158 2159 head = msg->front.iov_base; 2160 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2161 &head->num); 2162 item = msg->front.iov_base + msg->front.iov_len; 2163 item->ino = cpu_to_le64(cap->cap_ino); 2164 item->cap_id = cpu_to_le64(cap->cap_id); 2165 item->migrate_seq = cpu_to_le32(cap->mseq); 2166 item->seq = cpu_to_le32(cap->issue_seq); 2167 msg->front.iov_len += sizeof(*item); 2168 2169 ceph_put_cap(mdsc, cap); 2170 2171 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2172 // Append cap_barrier field 2173 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2174 *cap_barrier = barrier; 2175 msg->front.iov_len += sizeof(*cap_barrier); 2176 2177 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2178 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2179 ceph_con_send(&session->s_con, msg); 2180 msg = NULL; 2181 } 2182 } 2183 2184 BUG_ON(num_cap_releases != 0); 2185 2186 spin_lock(&session->s_cap_lock); 2187 if (!list_empty(&session->s_cap_releases)) 2188 goto again; 2189 spin_unlock(&session->s_cap_lock); 2190 2191 if (msg) { 2192 // Append cap_barrier field 2193 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2194 *cap_barrier = barrier; 2195 msg->front.iov_len += sizeof(*cap_barrier); 2196 2197 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2198 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2199 ceph_con_send(&session->s_con, msg); 2200 } 2201 return; 2202out_err: 2203 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2204 session->s_mds); 2205 spin_lock(&session->s_cap_lock); 2206 list_splice(&tmp_list, &session->s_cap_releases); 2207 session->s_num_cap_releases += num_cap_releases; 2208 spin_unlock(&session->s_cap_lock); 2209} 2210 2211static void ceph_cap_release_work(struct work_struct *work) 2212{ 2213 struct ceph_mds_session *session = 2214 container_of(work, struct ceph_mds_session, s_cap_release_work); 2215 2216 mutex_lock(&session->s_mutex); 2217 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2218 session->s_state == CEPH_MDS_SESSION_HUNG) 2219 ceph_send_cap_releases(session->s_mdsc, session); 2220 mutex_unlock(&session->s_mutex); 2221 ceph_put_mds_session(session); 2222} 2223 2224void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2225 struct ceph_mds_session *session) 2226{ 2227 if (mdsc->stopping) 2228 return; 2229 2230 ceph_get_mds_session(session); 2231 if (queue_work(mdsc->fsc->cap_wq, 2232 &session->s_cap_release_work)) { 2233 dout("cap release work queued\n"); 2234 } else { 2235 ceph_put_mds_session(session); 2236 dout("failed to queue cap release work\n"); 2237 } 2238} 2239 2240/* 2241 * caller holds session->s_cap_lock 2242 */ 2243void __ceph_queue_cap_release(struct ceph_mds_session *session, 2244 struct ceph_cap *cap) 2245{ 2246 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2247 session->s_num_cap_releases++; 2248 2249 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2250 ceph_flush_cap_releases(session->s_mdsc, session); 2251} 2252 2253static void ceph_cap_reclaim_work(struct work_struct *work) 2254{ 2255 struct ceph_mds_client *mdsc = 2256 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2257 int ret = ceph_trim_dentries(mdsc); 2258 if (ret == -EAGAIN) 2259 ceph_queue_cap_reclaim_work(mdsc); 2260} 2261 2262void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2263{ 2264 if (mdsc->stopping) 2265 return; 2266 2267 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2268 dout("caps reclaim work queued\n"); 2269 } else { 2270 dout("failed to queue caps release work\n"); 2271 } 2272} 2273 2274void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2275{ 2276 int val; 2277 if (!nr) 2278 return; 2279 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2280 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2281 atomic_set(&mdsc->cap_reclaim_pending, 0); 2282 ceph_queue_cap_reclaim_work(mdsc); 2283 } 2284} 2285 2286/* 2287 * requests 2288 */ 2289 2290int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2291 struct inode *dir) 2292{ 2293 struct ceph_inode_info *ci = ceph_inode(dir); 2294 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2295 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2296 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2297 unsigned int num_entries; 2298 int order; 2299 2300 spin_lock(&ci->i_ceph_lock); 2301 num_entries = ci->i_files + ci->i_subdirs; 2302 spin_unlock(&ci->i_ceph_lock); 2303 num_entries = max(num_entries, 1U); 2304 num_entries = min(num_entries, opt->max_readdir); 2305 2306 order = get_order(size * num_entries); 2307 while (order >= 0) { 2308 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2309 __GFP_NOWARN, 2310 order); 2311 if (rinfo->dir_entries) 2312 break; 2313 order--; 2314 } 2315 if (!rinfo->dir_entries) 2316 return -ENOMEM; 2317 2318 num_entries = (PAGE_SIZE << order) / size; 2319 num_entries = min(num_entries, opt->max_readdir); 2320 2321 rinfo->dir_buf_size = PAGE_SIZE << order; 2322 req->r_num_caps = num_entries + 1; 2323 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2324 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2325 return 0; 2326} 2327 2328/* 2329 * Create an mds request. 2330 */ 2331struct ceph_mds_request * 2332ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2333{ 2334 struct ceph_mds_request *req; 2335 2336 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2337 if (!req) 2338 return ERR_PTR(-ENOMEM); 2339 2340 mutex_init(&req->r_fill_mutex); 2341 req->r_mdsc = mdsc; 2342 req->r_started = jiffies; 2343 req->r_start_latency = ktime_get(); 2344 req->r_resend_mds = -1; 2345 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2346 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2347 req->r_fmode = -1; 2348 kref_init(&req->r_kref); 2349 RB_CLEAR_NODE(&req->r_node); 2350 INIT_LIST_HEAD(&req->r_wait); 2351 init_completion(&req->r_completion); 2352 init_completion(&req->r_safe_completion); 2353 INIT_LIST_HEAD(&req->r_unsafe_item); 2354 2355 ktime_get_coarse_real_ts64(&req->r_stamp); 2356 2357 req->r_op = op; 2358 req->r_direct_mode = mode; 2359 return req; 2360} 2361 2362/* 2363 * return oldest (lowest) request, tid in request tree, 0 if none. 2364 * 2365 * called under mdsc->mutex. 2366 */ 2367static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2368{ 2369 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2370 return NULL; 2371 return rb_entry(rb_first(&mdsc->request_tree), 2372 struct ceph_mds_request, r_node); 2373} 2374 2375static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2376{ 2377 return mdsc->oldest_tid; 2378} 2379 2380/* 2381 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2382 * on build_path_from_dentry in fs/cifs/dir.c. 2383 * 2384 * If @stop_on_nosnap, generate path relative to the first non-snapped 2385 * inode. 2386 * 2387 * Encode hidden .snap dirs as a double /, i.e. 2388 * foo/.snap/bar -> foo//bar 2389 */ 2390char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2391 int stop_on_nosnap) 2392{ 2393 struct dentry *temp; 2394 char *path; 2395 int pos; 2396 unsigned seq; 2397 u64 base; 2398 2399 if (!dentry) 2400 return ERR_PTR(-EINVAL); 2401 2402 path = __getname(); 2403 if (!path) 2404 return ERR_PTR(-ENOMEM); 2405retry: 2406 pos = PATH_MAX - 1; 2407 path[pos] = '\0'; 2408 2409 seq = read_seqbegin(&rename_lock); 2410 rcu_read_lock(); 2411 temp = dentry; 2412 for (;;) { 2413 struct inode *inode; 2414 2415 spin_lock(&temp->d_lock); 2416 inode = d_inode(temp); 2417 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2418 dout("build_path path+%d: %p SNAPDIR\n", 2419 pos, temp); 2420 } else if (stop_on_nosnap && inode && dentry != temp && 2421 ceph_snap(inode) == CEPH_NOSNAP) { 2422 spin_unlock(&temp->d_lock); 2423 pos++; /* get rid of any prepended '/' */ 2424 break; 2425 } else { 2426 pos -= temp->d_name.len; 2427 if (pos < 0) { 2428 spin_unlock(&temp->d_lock); 2429 break; 2430 } 2431 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2432 } 2433 spin_unlock(&temp->d_lock); 2434 temp = READ_ONCE(temp->d_parent); 2435 2436 /* Are we at the root? */ 2437 if (IS_ROOT(temp)) 2438 break; 2439 2440 /* Are we out of buffer? */ 2441 if (--pos < 0) 2442 break; 2443 2444 path[pos] = '/'; 2445 } 2446 base = ceph_ino(d_inode(temp)); 2447 rcu_read_unlock(); 2448 2449 if (read_seqretry(&rename_lock, seq)) 2450 goto retry; 2451 2452 if (pos < 0) { 2453 /* 2454 * A rename didn't occur, but somehow we didn't end up where 2455 * we thought we would. Throw a warning and try again. 2456 */ 2457 pr_warn("build_path did not end path lookup where " 2458 "expected, pos is %d\n", pos); 2459 goto retry; 2460 } 2461 2462 *pbase = base; 2463 *plen = PATH_MAX - 1 - pos; 2464 dout("build_path on %p %d built %llx '%.*s'\n", 2465 dentry, d_count(dentry), base, *plen, path + pos); 2466 return path + pos; 2467} 2468 2469static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2470 const char **ppath, int *ppathlen, u64 *pino, 2471 bool *pfreepath, bool parent_locked) 2472{ 2473 char *path; 2474 2475 rcu_read_lock(); 2476 if (!dir) 2477 dir = d_inode_rcu(dentry->d_parent); 2478 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2479 *pino = ceph_ino(dir); 2480 rcu_read_unlock(); 2481 *ppath = dentry->d_name.name; 2482 *ppathlen = dentry->d_name.len; 2483 return 0; 2484 } 2485 rcu_read_unlock(); 2486 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2487 if (IS_ERR(path)) 2488 return PTR_ERR(path); 2489 *ppath = path; 2490 *pfreepath = true; 2491 return 0; 2492} 2493 2494static int build_inode_path(struct inode *inode, 2495 const char **ppath, int *ppathlen, u64 *pino, 2496 bool *pfreepath) 2497{ 2498 struct dentry *dentry; 2499 char *path; 2500 2501 if (ceph_snap(inode) == CEPH_NOSNAP) { 2502 *pino = ceph_ino(inode); 2503 *ppathlen = 0; 2504 return 0; 2505 } 2506 dentry = d_find_alias(inode); 2507 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2508 dput(dentry); 2509 if (IS_ERR(path)) 2510 return PTR_ERR(path); 2511 *ppath = path; 2512 *pfreepath = true; 2513 return 0; 2514} 2515 2516/* 2517 * request arguments may be specified via an inode *, a dentry *, or 2518 * an explicit ino+path. 2519 */ 2520static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2521 struct inode *rdiri, const char *rpath, 2522 u64 rino, const char **ppath, int *pathlen, 2523 u64 *ino, bool *freepath, bool parent_locked) 2524{ 2525 int r = 0; 2526 2527 if (rinode) { 2528 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2529 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2530 ceph_snap(rinode)); 2531 } else if (rdentry) { 2532 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2533 freepath, parent_locked); 2534 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2535 *ppath); 2536 } else if (rpath || rino) { 2537 *ino = rino; 2538 *ppath = rpath; 2539 *pathlen = rpath ? strlen(rpath) : 0; 2540 dout(" path %.*s\n", *pathlen, rpath); 2541 } 2542 2543 return r; 2544} 2545 2546/* 2547 * called under mdsc->mutex 2548 */ 2549static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2550 struct ceph_mds_request *req, 2551 int mds, bool drop_cap_releases) 2552{ 2553 struct ceph_msg *msg; 2554 struct ceph_mds_request_head *head; 2555 const char *path1 = NULL; 2556 const char *path2 = NULL; 2557 u64 ino1 = 0, ino2 = 0; 2558 int pathlen1 = 0, pathlen2 = 0; 2559 bool freepath1 = false, freepath2 = false; 2560 int len; 2561 u16 releases; 2562 void *p, *end; 2563 int ret; 2564 2565 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2566 req->r_parent, req->r_path1, req->r_ino1.ino, 2567 &path1, &pathlen1, &ino1, &freepath1, 2568 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2569 &req->r_req_flags)); 2570 if (ret < 0) { 2571 msg = ERR_PTR(ret); 2572 goto out; 2573 } 2574 2575 /* If r_old_dentry is set, then assume that its parent is locked */ 2576 ret = set_request_path_attr(NULL, req->r_old_dentry, 2577 req->r_old_dentry_dir, 2578 req->r_path2, req->r_ino2.ino, 2579 &path2, &pathlen2, &ino2, &freepath2, true); 2580 if (ret < 0) { 2581 msg = ERR_PTR(ret); 2582 goto out_free1; 2583 } 2584 2585 len = sizeof(*head) + 2586 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2587 sizeof(struct ceph_timespec); 2588 2589 /* calculate (max) length for cap releases */ 2590 len += sizeof(struct ceph_mds_request_release) * 2591 (!!req->r_inode_drop + !!req->r_dentry_drop + 2592 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2593 if (req->r_dentry_drop) 2594 len += pathlen1; 2595 if (req->r_old_dentry_drop) 2596 len += pathlen2; 2597 2598 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2599 if (!msg) { 2600 msg = ERR_PTR(-ENOMEM); 2601 goto out_free2; 2602 } 2603 2604 msg->hdr.version = cpu_to_le16(2); 2605 msg->hdr.tid = cpu_to_le64(req->r_tid); 2606 2607 head = msg->front.iov_base; 2608 p = msg->front.iov_base + sizeof(*head); 2609 end = msg->front.iov_base + msg->front.iov_len; 2610 2611 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2612 head->op = cpu_to_le32(req->r_op); 2613 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2614 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2615 head->ino = cpu_to_le64(req->r_deleg_ino); 2616 head->args = req->r_args; 2617 2618 ceph_encode_filepath(&p, end, ino1, path1); 2619 ceph_encode_filepath(&p, end, ino2, path2); 2620 2621 /* make note of release offset, in case we need to replay */ 2622 req->r_request_release_offset = p - msg->front.iov_base; 2623 2624 /* cap releases */ 2625 releases = 0; 2626 if (req->r_inode_drop) 2627 releases += ceph_encode_inode_release(&p, 2628 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2629 mds, req->r_inode_drop, req->r_inode_unless, 2630 req->r_op == CEPH_MDS_OP_READDIR); 2631 if (req->r_dentry_drop) 2632 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2633 req->r_parent, mds, req->r_dentry_drop, 2634 req->r_dentry_unless); 2635 if (req->r_old_dentry_drop) 2636 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2637 req->r_old_dentry_dir, mds, 2638 req->r_old_dentry_drop, 2639 req->r_old_dentry_unless); 2640 if (req->r_old_inode_drop) 2641 releases += ceph_encode_inode_release(&p, 2642 d_inode(req->r_old_dentry), 2643 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2644 2645 if (drop_cap_releases) { 2646 releases = 0; 2647 p = msg->front.iov_base + req->r_request_release_offset; 2648 } 2649 2650 head->num_releases = cpu_to_le16(releases); 2651 2652 /* time stamp */ 2653 { 2654 struct ceph_timespec ts; 2655 ceph_encode_timespec64(&ts, &req->r_stamp); 2656 ceph_encode_copy(&p, &ts, sizeof(ts)); 2657 } 2658 2659 if (WARN_ON_ONCE(p > end)) { 2660 ceph_msg_put(msg); 2661 msg = ERR_PTR(-ERANGE); 2662 goto out_free2; 2663 } 2664 2665 msg->front.iov_len = p - msg->front.iov_base; 2666 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2667 2668 if (req->r_pagelist) { 2669 struct ceph_pagelist *pagelist = req->r_pagelist; 2670 ceph_msg_data_add_pagelist(msg, pagelist); 2671 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2672 } else { 2673 msg->hdr.data_len = 0; 2674 } 2675 2676 msg->hdr.data_off = cpu_to_le16(0); 2677 2678out_free2: 2679 if (freepath2) 2680 ceph_mdsc_free_path((char *)path2, pathlen2); 2681out_free1: 2682 if (freepath1) 2683 ceph_mdsc_free_path((char *)path1, pathlen1); 2684out: 2685 return msg; 2686} 2687 2688/* 2689 * called under mdsc->mutex if error, under no mutex if 2690 * success. 2691 */ 2692static void complete_request(struct ceph_mds_client *mdsc, 2693 struct ceph_mds_request *req) 2694{ 2695 req->r_end_latency = ktime_get(); 2696 2697 if (req->r_callback) 2698 req->r_callback(mdsc, req); 2699 complete_all(&req->r_completion); 2700} 2701 2702/* 2703 * called under mdsc->mutex 2704 */ 2705static int __prepare_send_request(struct ceph_mds_client *mdsc, 2706 struct ceph_mds_request *req, 2707 int mds, bool drop_cap_releases) 2708{ 2709 struct ceph_mds_request_head *rhead; 2710 struct ceph_msg *msg; 2711 int flags = 0; 2712 2713 req->r_attempts++; 2714 if (req->r_inode) { 2715 struct ceph_cap *cap = 2716 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2717 2718 if (cap) 2719 req->r_sent_on_mseq = cap->mseq; 2720 else 2721 req->r_sent_on_mseq = -1; 2722 } 2723 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2724 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2725 2726 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2727 void *p; 2728 /* 2729 * Replay. Do not regenerate message (and rebuild 2730 * paths, etc.); just use the original message. 2731 * Rebuilding paths will break for renames because 2732 * d_move mangles the src name. 2733 */ 2734 msg = req->r_request; 2735 rhead = msg->front.iov_base; 2736 2737 flags = le32_to_cpu(rhead->flags); 2738 flags |= CEPH_MDS_FLAG_REPLAY; 2739 rhead->flags = cpu_to_le32(flags); 2740 2741 if (req->r_target_inode) 2742 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2743 2744 rhead->num_retry = req->r_attempts - 1; 2745 2746 /* remove cap/dentry releases from message */ 2747 rhead->num_releases = 0; 2748 2749 /* time stamp */ 2750 p = msg->front.iov_base + req->r_request_release_offset; 2751 { 2752 struct ceph_timespec ts; 2753 ceph_encode_timespec64(&ts, &req->r_stamp); 2754 ceph_encode_copy(&p, &ts, sizeof(ts)); 2755 } 2756 2757 msg->front.iov_len = p - msg->front.iov_base; 2758 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2759 return 0; 2760 } 2761 2762 if (req->r_request) { 2763 ceph_msg_put(req->r_request); 2764 req->r_request = NULL; 2765 } 2766 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2767 if (IS_ERR(msg)) { 2768 req->r_err = PTR_ERR(msg); 2769 return PTR_ERR(msg); 2770 } 2771 req->r_request = msg; 2772 2773 rhead = msg->front.iov_base; 2774 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2775 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2776 flags |= CEPH_MDS_FLAG_REPLAY; 2777 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2778 flags |= CEPH_MDS_FLAG_ASYNC; 2779 if (req->r_parent) 2780 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2781 rhead->flags = cpu_to_le32(flags); 2782 rhead->num_fwd = req->r_num_fwd; 2783 rhead->num_retry = req->r_attempts - 1; 2784 2785 dout(" r_parent = %p\n", req->r_parent); 2786 return 0; 2787} 2788 2789/* 2790 * called under mdsc->mutex 2791 */ 2792static int __send_request(struct ceph_mds_client *mdsc, 2793 struct ceph_mds_session *session, 2794 struct ceph_mds_request *req, 2795 bool drop_cap_releases) 2796{ 2797 int err; 2798 2799 err = __prepare_send_request(mdsc, req, session->s_mds, 2800 drop_cap_releases); 2801 if (!err) { 2802 ceph_msg_get(req->r_request); 2803 ceph_con_send(&session->s_con, req->r_request); 2804 } 2805 2806 return err; 2807} 2808 2809/* 2810 * send request, or put it on the appropriate wait list. 2811 */ 2812static void __do_request(struct ceph_mds_client *mdsc, 2813 struct ceph_mds_request *req) 2814{ 2815 struct ceph_mds_session *session = NULL; 2816 int mds = -1; 2817 int err = 0; 2818 bool random; 2819 2820 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2821 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2822 __unregister_request(mdsc, req); 2823 return; 2824 } 2825 2826 if (req->r_timeout && 2827 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2828 dout("do_request timed out\n"); 2829 err = -ETIMEDOUT; 2830 goto finish; 2831 } 2832 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2833 dout("do_request forced umount\n"); 2834 err = -EIO; 2835 goto finish; 2836 } 2837 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2838 if (mdsc->mdsmap_err) { 2839 err = mdsc->mdsmap_err; 2840 dout("do_request mdsmap err %d\n", err); 2841 goto finish; 2842 } 2843 if (mdsc->mdsmap->m_epoch == 0) { 2844 dout("do_request no mdsmap, waiting for map\n"); 2845 list_add(&req->r_wait, &mdsc->waiting_for_map); 2846 return; 2847 } 2848 if (!(mdsc->fsc->mount_options->flags & 2849 CEPH_MOUNT_OPT_MOUNTWAIT) && 2850 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2851 err = -EHOSTUNREACH; 2852 goto finish; 2853 } 2854 } 2855 2856 put_request_session(req); 2857 2858 mds = __choose_mds(mdsc, req, &random); 2859 if (mds < 0 || 2860 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2861 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2862 err = -EJUKEBOX; 2863 goto finish; 2864 } 2865 dout("do_request no mds or not active, waiting for map\n"); 2866 list_add(&req->r_wait, &mdsc->waiting_for_map); 2867 return; 2868 } 2869 2870 /* get, open session */ 2871 session = __ceph_lookup_mds_session(mdsc, mds); 2872 if (!session) { 2873 session = register_session(mdsc, mds); 2874 if (IS_ERR(session)) { 2875 err = PTR_ERR(session); 2876 goto finish; 2877 } 2878 } 2879 req->r_session = ceph_get_mds_session(session); 2880 2881 dout("do_request mds%d session %p state %s\n", mds, session, 2882 ceph_session_state_name(session->s_state)); 2883 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2884 session->s_state != CEPH_MDS_SESSION_HUNG) { 2885 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2886 err = -EACCES; 2887 goto out_session; 2888 } 2889 /* 2890 * We cannot queue async requests since the caps and delegated 2891 * inodes are bound to the session. Just return -EJUKEBOX and 2892 * let the caller retry a sync request in that case. 2893 */ 2894 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2895 err = -EJUKEBOX; 2896 goto out_session; 2897 } 2898 if (session->s_state == CEPH_MDS_SESSION_NEW || 2899 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2900 err = __open_session(mdsc, session); 2901 if (err) 2902 goto out_session; 2903 /* retry the same mds later */ 2904 if (random) 2905 req->r_resend_mds = mds; 2906 } 2907 list_add(&req->r_wait, &session->s_waiting); 2908 goto out_session; 2909 } 2910 2911 /* send request */ 2912 req->r_resend_mds = -1; /* forget any previous mds hint */ 2913 2914 if (req->r_request_started == 0) /* note request start time */ 2915 req->r_request_started = jiffies; 2916 2917 err = __send_request(mdsc, session, req, false); 2918 2919out_session: 2920 ceph_put_mds_session(session); 2921finish: 2922 if (err) { 2923 dout("__do_request early error %d\n", err); 2924 req->r_err = err; 2925 complete_request(mdsc, req); 2926 __unregister_request(mdsc, req); 2927 } 2928 return; 2929} 2930 2931/* 2932 * called under mdsc->mutex 2933 */ 2934static void __wake_requests(struct ceph_mds_client *mdsc, 2935 struct list_head *head) 2936{ 2937 struct ceph_mds_request *req; 2938 LIST_HEAD(tmp_list); 2939 2940 list_splice_init(head, &tmp_list); 2941 2942 while (!list_empty(&tmp_list)) { 2943 req = list_entry(tmp_list.next, 2944 struct ceph_mds_request, r_wait); 2945 list_del_init(&req->r_wait); 2946 dout(" wake request %p tid %llu\n", req, req->r_tid); 2947 __do_request(mdsc, req); 2948 } 2949} 2950 2951/* 2952 * Wake up threads with requests pending for @mds, so that they can 2953 * resubmit their requests to a possibly different mds. 2954 */ 2955static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2956{ 2957 struct ceph_mds_request *req; 2958 struct rb_node *p = rb_first(&mdsc->request_tree); 2959 2960 dout("kick_requests mds%d\n", mds); 2961 while (p) { 2962 req = rb_entry(p, struct ceph_mds_request, r_node); 2963 p = rb_next(p); 2964 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2965 continue; 2966 if (req->r_attempts > 0) 2967 continue; /* only new requests */ 2968 if (req->r_session && 2969 req->r_session->s_mds == mds) { 2970 dout(" kicking tid %llu\n", req->r_tid); 2971 list_del_init(&req->r_wait); 2972 __do_request(mdsc, req); 2973 } 2974 } 2975} 2976 2977int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2978 struct ceph_mds_request *req) 2979{ 2980 int err = 0; 2981 2982 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2983 if (req->r_inode) 2984 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2985 if (req->r_parent) { 2986 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2987 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2988 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2989 spin_lock(&ci->i_ceph_lock); 2990 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2991 __ceph_touch_fmode(ci, mdsc, fmode); 2992 spin_unlock(&ci->i_ceph_lock); 2993 ihold(req->r_parent); 2994 } 2995 if (req->r_old_dentry_dir) 2996 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2997 CEPH_CAP_PIN); 2998 2999 if (req->r_inode) { 3000 err = ceph_wait_on_async_create(req->r_inode); 3001 if (err) { 3002 dout("%s: wait for async create returned: %d\n", 3003 __func__, err); 3004 return err; 3005 } 3006 } 3007 3008 if (!err && req->r_old_inode) { 3009 err = ceph_wait_on_async_create(req->r_old_inode); 3010 if (err) { 3011 dout("%s: wait for async create returned: %d\n", 3012 __func__, err); 3013 return err; 3014 } 3015 } 3016 3017 dout("submit_request on %p for inode %p\n", req, dir); 3018 mutex_lock(&mdsc->mutex); 3019 __register_request(mdsc, req, dir); 3020 __do_request(mdsc, req); 3021 err = req->r_err; 3022 mutex_unlock(&mdsc->mutex); 3023 return err; 3024} 3025 3026static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3027 struct ceph_mds_request *req) 3028{ 3029 int err; 3030 3031 /* wait */ 3032 dout("do_request waiting\n"); 3033 if (!req->r_timeout && req->r_wait_for_completion) { 3034 err = req->r_wait_for_completion(mdsc, req); 3035 } else { 3036 long timeleft = wait_for_completion_killable_timeout( 3037 &req->r_completion, 3038 ceph_timeout_jiffies(req->r_timeout)); 3039 if (timeleft > 0) 3040 err = 0; 3041 else if (!timeleft) 3042 err = -ETIMEDOUT; /* timed out */ 3043 else 3044 err = timeleft; /* killed */ 3045 } 3046 dout("do_request waited, got %d\n", err); 3047 mutex_lock(&mdsc->mutex); 3048 3049 /* only abort if we didn't race with a real reply */ 3050 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3051 err = le32_to_cpu(req->r_reply_info.head->result); 3052 } else if (err < 0) { 3053 dout("aborted request %lld with %d\n", req->r_tid, err); 3054 3055 /* 3056 * ensure we aren't running concurrently with 3057 * ceph_fill_trace or ceph_readdir_prepopulate, which 3058 * rely on locks (dir mutex) held by our caller. 3059 */ 3060 mutex_lock(&req->r_fill_mutex); 3061 req->r_err = err; 3062 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3063 mutex_unlock(&req->r_fill_mutex); 3064 3065 if (req->r_parent && 3066 (req->r_op & CEPH_MDS_OP_WRITE)) 3067 ceph_invalidate_dir_request(req); 3068 } else { 3069 err = req->r_err; 3070 } 3071 3072 mutex_unlock(&mdsc->mutex); 3073 return err; 3074} 3075 3076/* 3077 * Synchrously perform an mds request. Take care of all of the 3078 * session setup, forwarding, retry details. 3079 */ 3080int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3081 struct inode *dir, 3082 struct ceph_mds_request *req) 3083{ 3084 int err; 3085 3086 dout("do_request on %p\n", req); 3087 3088 /* issue */ 3089 err = ceph_mdsc_submit_request(mdsc, dir, req); 3090 if (!err) 3091 err = ceph_mdsc_wait_request(mdsc, req); 3092 dout("do_request %p done, result %d\n", req, err); 3093 return err; 3094} 3095 3096/* 3097 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3098 * namespace request. 3099 */ 3100void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3101{ 3102 struct inode *dir = req->r_parent; 3103 struct inode *old_dir = req->r_old_dentry_dir; 3104 3105 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3106 3107 ceph_dir_clear_complete(dir); 3108 if (old_dir) 3109 ceph_dir_clear_complete(old_dir); 3110 if (req->r_dentry) 3111 ceph_invalidate_dentry_lease(req->r_dentry); 3112 if (req->r_old_dentry) 3113 ceph_invalidate_dentry_lease(req->r_old_dentry); 3114} 3115 3116/* 3117 * Handle mds reply. 3118 * 3119 * We take the session mutex and parse and process the reply immediately. 3120 * This preserves the logical ordering of replies, capabilities, etc., sent 3121 * by the MDS as they are applied to our local cache. 3122 */ 3123static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3124{ 3125 struct ceph_mds_client *mdsc = session->s_mdsc; 3126 struct ceph_mds_request *req; 3127 struct ceph_mds_reply_head *head = msg->front.iov_base; 3128 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3129 struct ceph_snap_realm *realm; 3130 u64 tid; 3131 int err, result; 3132 int mds = session->s_mds; 3133 3134 if (msg->front.iov_len < sizeof(*head)) { 3135 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3136 ceph_msg_dump(msg); 3137 return; 3138 } 3139 3140 /* get request, session */ 3141 tid = le64_to_cpu(msg->hdr.tid); 3142 mutex_lock(&mdsc->mutex); 3143 req = lookup_get_request(mdsc, tid); 3144 if (!req) { 3145 dout("handle_reply on unknown tid %llu\n", tid); 3146 mutex_unlock(&mdsc->mutex); 3147 return; 3148 } 3149 dout("handle_reply %p\n", req); 3150 3151 /* correct session? */ 3152 if (req->r_session != session) { 3153 pr_err("mdsc_handle_reply got %llu on session mds%d" 3154 " not mds%d\n", tid, session->s_mds, 3155 req->r_session ? req->r_session->s_mds : -1); 3156 mutex_unlock(&mdsc->mutex); 3157 goto out; 3158 } 3159 3160 /* dup? */ 3161 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3162 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3163 pr_warn("got a dup %s reply on %llu from mds%d\n", 3164 head->safe ? "safe" : "unsafe", tid, mds); 3165 mutex_unlock(&mdsc->mutex); 3166 goto out; 3167 } 3168 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3169 pr_warn("got unsafe after safe on %llu from mds%d\n", 3170 tid, mds); 3171 mutex_unlock(&mdsc->mutex); 3172 goto out; 3173 } 3174 3175 result = le32_to_cpu(head->result); 3176 3177 /* 3178 * Handle an ESTALE 3179 * if we're not talking to the authority, send to them 3180 * if the authority has changed while we weren't looking, 3181 * send to new authority 3182 * Otherwise we just have to return an ESTALE 3183 */ 3184 if (result == -ESTALE) { 3185 dout("got ESTALE on request %llu\n", req->r_tid); 3186 req->r_resend_mds = -1; 3187 if (req->r_direct_mode != USE_AUTH_MDS) { 3188 dout("not using auth, setting for that now\n"); 3189 req->r_direct_mode = USE_AUTH_MDS; 3190 __do_request(mdsc, req); 3191 mutex_unlock(&mdsc->mutex); 3192 goto out; 3193 } else { 3194 int mds = __choose_mds(mdsc, req, NULL); 3195 if (mds >= 0 && mds != req->r_session->s_mds) { 3196 dout("but auth changed, so resending\n"); 3197 __do_request(mdsc, req); 3198 mutex_unlock(&mdsc->mutex); 3199 goto out; 3200 } 3201 } 3202 dout("have to return ESTALE on request %llu\n", req->r_tid); 3203 } 3204 3205 3206 if (head->safe) { 3207 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3208 __unregister_request(mdsc, req); 3209 3210 /* last request during umount? */ 3211 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3212 complete_all(&mdsc->safe_umount_waiters); 3213 3214 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3215 /* 3216 * We already handled the unsafe response, now do the 3217 * cleanup. No need to examine the response; the MDS 3218 * doesn't include any result info in the safe 3219 * response. And even if it did, there is nothing 3220 * useful we could do with a revised return value. 3221 */ 3222 dout("got safe reply %llu, mds%d\n", tid, mds); 3223 3224 mutex_unlock(&mdsc->mutex); 3225 goto out; 3226 } 3227 } else { 3228 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3229 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3230 } 3231 3232 dout("handle_reply tid %lld result %d\n", tid, result); 3233 rinfo = &req->r_reply_info; 3234 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3235 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3236 else 3237 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3238 mutex_unlock(&mdsc->mutex); 3239 3240 mutex_lock(&session->s_mutex); 3241 if (err < 0) { 3242 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3243 ceph_msg_dump(msg); 3244 goto out_err; 3245 } 3246 3247 /* snap trace */ 3248 realm = NULL; 3249 if (rinfo->snapblob_len) { 3250 down_write(&mdsc->snap_rwsem); 3251 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3252 rinfo->snapblob + rinfo->snapblob_len, 3253 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3254 &realm); 3255 downgrade_write(&mdsc->snap_rwsem); 3256 } else { 3257 down_read(&mdsc->snap_rwsem); 3258 } 3259 3260 /* insert trace into our cache */ 3261 mutex_lock(&req->r_fill_mutex); 3262 current->journal_info = req; 3263 err = ceph_fill_trace(mdsc->fsc->sb, req); 3264 if (err == 0) { 3265 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3266 req->r_op == CEPH_MDS_OP_LSSNAP)) 3267 ceph_readdir_prepopulate(req, req->r_session); 3268 } 3269 current->journal_info = NULL; 3270 mutex_unlock(&req->r_fill_mutex); 3271 3272 up_read(&mdsc->snap_rwsem); 3273 if (realm) 3274 ceph_put_snap_realm(mdsc, realm); 3275 3276 if (err == 0) { 3277 if (req->r_target_inode && 3278 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3279 struct ceph_inode_info *ci = 3280 ceph_inode(req->r_target_inode); 3281 spin_lock(&ci->i_unsafe_lock); 3282 list_add_tail(&req->r_unsafe_target_item, 3283 &ci->i_unsafe_iops); 3284 spin_unlock(&ci->i_unsafe_lock); 3285 } 3286 3287 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3288 } 3289out_err: 3290 mutex_lock(&mdsc->mutex); 3291 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3292 if (err) { 3293 req->r_err = err; 3294 } else { 3295 req->r_reply = ceph_msg_get(msg); 3296 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3297 } 3298 } else { 3299 dout("reply arrived after request %lld was aborted\n", tid); 3300 } 3301 mutex_unlock(&mdsc->mutex); 3302 3303 mutex_unlock(&session->s_mutex); 3304 3305 /* kick calling process */ 3306 complete_request(mdsc, req); 3307 3308 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, 3309 req->r_end_latency, err); 3310out: 3311 ceph_mdsc_put_request(req); 3312 return; 3313} 3314 3315 3316 3317/* 3318 * handle mds notification that our request has been forwarded. 3319 */ 3320static void handle_forward(struct ceph_mds_client *mdsc, 3321 struct ceph_mds_session *session, 3322 struct ceph_msg *msg) 3323{ 3324 struct ceph_mds_request *req; 3325 u64 tid = le64_to_cpu(msg->hdr.tid); 3326 u32 next_mds; 3327 u32 fwd_seq; 3328 int err = -EINVAL; 3329 void *p = msg->front.iov_base; 3330 void *end = p + msg->front.iov_len; 3331 3332 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3333 next_mds = ceph_decode_32(&p); 3334 fwd_seq = ceph_decode_32(&p); 3335 3336 mutex_lock(&mdsc->mutex); 3337 req = lookup_get_request(mdsc, tid); 3338 if (!req) { 3339 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3340 goto out; /* dup reply? */ 3341 } 3342 3343 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3344 dout("forward tid %llu aborted, unregistering\n", tid); 3345 __unregister_request(mdsc, req); 3346 } else if (fwd_seq <= req->r_num_fwd) { 3347 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3348 tid, next_mds, req->r_num_fwd, fwd_seq); 3349 } else { 3350 /* resend. forward race not possible; mds would drop */ 3351 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3352 BUG_ON(req->r_err); 3353 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3354 req->r_attempts = 0; 3355 req->r_num_fwd = fwd_seq; 3356 req->r_resend_mds = next_mds; 3357 put_request_session(req); 3358 __do_request(mdsc, req); 3359 } 3360 ceph_mdsc_put_request(req); 3361out: 3362 mutex_unlock(&mdsc->mutex); 3363 return; 3364 3365bad: 3366 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3367} 3368 3369static int __decode_session_metadata(void **p, void *end, 3370 bool *blocklisted) 3371{ 3372 /* map<string,string> */ 3373 u32 n; 3374 bool err_str; 3375 ceph_decode_32_safe(p, end, n, bad); 3376 while (n-- > 0) { 3377 u32 len; 3378 ceph_decode_32_safe(p, end, len, bad); 3379 ceph_decode_need(p, end, len, bad); 3380 err_str = !strncmp(*p, "error_string", len); 3381 *p += len; 3382 ceph_decode_32_safe(p, end, len, bad); 3383 ceph_decode_need(p, end, len, bad); 3384 /* 3385 * Match "blocklisted (blacklisted)" from newer MDSes, 3386 * or "blacklisted" from older MDSes. 3387 */ 3388 if (err_str && strnstr(*p, "blacklisted", len)) 3389 *blocklisted = true; 3390 *p += len; 3391 } 3392 return 0; 3393bad: 3394 return -1; 3395} 3396 3397/* 3398 * handle a mds session control message 3399 */ 3400static void handle_session(struct ceph_mds_session *session, 3401 struct ceph_msg *msg) 3402{ 3403 struct ceph_mds_client *mdsc = session->s_mdsc; 3404 int mds = session->s_mds; 3405 int msg_version = le16_to_cpu(msg->hdr.version); 3406 void *p = msg->front.iov_base; 3407 void *end = p + msg->front.iov_len; 3408 struct ceph_mds_session_head *h; 3409 u32 op; 3410 u64 seq, features = 0; 3411 int wake = 0; 3412 bool blocklisted = false; 3413 3414 /* decode */ 3415 ceph_decode_need(&p, end, sizeof(*h), bad); 3416 h = p; 3417 p += sizeof(*h); 3418 3419 op = le32_to_cpu(h->op); 3420 seq = le64_to_cpu(h->seq); 3421 3422 if (msg_version >= 3) { 3423 u32 len; 3424 /* version >= 2, metadata */ 3425 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3426 goto bad; 3427 /* version >= 3, feature bits */ 3428 ceph_decode_32_safe(&p, end, len, bad); 3429 if (len) { 3430 ceph_decode_64_safe(&p, end, features, bad); 3431 p += len - sizeof(features); 3432 } 3433 } 3434 3435 mutex_lock(&mdsc->mutex); 3436 if (op == CEPH_SESSION_CLOSE) { 3437 ceph_get_mds_session(session); 3438 __unregister_session(mdsc, session); 3439 } 3440 /* FIXME: this ttl calculation is generous */ 3441 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3442 mutex_unlock(&mdsc->mutex); 3443 3444 mutex_lock(&session->s_mutex); 3445 3446 dout("handle_session mds%d %s %p state %s seq %llu\n", 3447 mds, ceph_session_op_name(op), session, 3448 ceph_session_state_name(session->s_state), seq); 3449 3450 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3451 session->s_state = CEPH_MDS_SESSION_OPEN; 3452 pr_info("mds%d came back\n", session->s_mds); 3453 } 3454 3455 switch (op) { 3456 case CEPH_SESSION_OPEN: 3457 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3458 pr_info("mds%d reconnect success\n", session->s_mds); 3459 session->s_state = CEPH_MDS_SESSION_OPEN; 3460 session->s_features = features; 3461 renewed_caps(mdsc, session, 0); 3462 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3463 metric_schedule_delayed(&mdsc->metric); 3464 wake = 1; 3465 if (mdsc->stopping) 3466 __close_session(mdsc, session); 3467 break; 3468 3469 case CEPH_SESSION_RENEWCAPS: 3470 if (session->s_renew_seq == seq) 3471 renewed_caps(mdsc, session, 1); 3472 break; 3473 3474 case CEPH_SESSION_CLOSE: 3475 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3476 pr_info("mds%d reconnect denied\n", session->s_mds); 3477 session->s_state = CEPH_MDS_SESSION_CLOSED; 3478 cleanup_session_requests(mdsc, session); 3479 remove_session_caps(session); 3480 wake = 2; /* for good measure */ 3481 wake_up_all(&mdsc->session_close_wq); 3482 break; 3483 3484 case CEPH_SESSION_STALE: 3485 pr_info("mds%d caps went stale, renewing\n", 3486 session->s_mds); 3487 spin_lock(&session->s_gen_ttl_lock); 3488 session->s_cap_gen++; 3489 session->s_cap_ttl = jiffies - 1; 3490 spin_unlock(&session->s_gen_ttl_lock); 3491 send_renew_caps(mdsc, session); 3492 break; 3493 3494 case CEPH_SESSION_RECALL_STATE: 3495 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3496 break; 3497 3498 case CEPH_SESSION_FLUSHMSG: 3499 /* flush cap releases */ 3500 spin_lock(&session->s_cap_lock); 3501 if (session->s_num_cap_releases) 3502 ceph_flush_cap_releases(mdsc, session); 3503 spin_unlock(&session->s_cap_lock); 3504 3505 send_flushmsg_ack(mdsc, session, seq); 3506 break; 3507 3508 case CEPH_SESSION_FORCE_RO: 3509 dout("force_session_readonly %p\n", session); 3510 spin_lock(&session->s_cap_lock); 3511 session->s_readonly = true; 3512 spin_unlock(&session->s_cap_lock); 3513 wake_up_session_caps(session, FORCE_RO); 3514 break; 3515 3516 case CEPH_SESSION_REJECT: 3517 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3518 pr_info("mds%d rejected session\n", session->s_mds); 3519 session->s_state = CEPH_MDS_SESSION_REJECTED; 3520 cleanup_session_requests(mdsc, session); 3521 remove_session_caps(session); 3522 if (blocklisted) 3523 mdsc->fsc->blocklisted = true; 3524 wake = 2; /* for good measure */ 3525 break; 3526 3527 default: 3528 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3529 WARN_ON(1); 3530 } 3531 3532 mutex_unlock(&session->s_mutex); 3533 if (wake) { 3534 mutex_lock(&mdsc->mutex); 3535 __wake_requests(mdsc, &session->s_waiting); 3536 if (wake == 2) 3537 kick_requests(mdsc, mds); 3538 mutex_unlock(&mdsc->mutex); 3539 } 3540 if (op == CEPH_SESSION_CLOSE) 3541 ceph_put_mds_session(session); 3542 return; 3543 3544bad: 3545 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3546 (int)msg->front.iov_len); 3547 ceph_msg_dump(msg); 3548 return; 3549} 3550 3551void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3552{ 3553 int dcaps; 3554 3555 dcaps = xchg(&req->r_dir_caps, 0); 3556 if (dcaps) { 3557 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3558 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3559 } 3560} 3561 3562void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3563{ 3564 int dcaps; 3565 3566 dcaps = xchg(&req->r_dir_caps, 0); 3567 if (dcaps) { 3568 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3569 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3570 dcaps); 3571 } 3572} 3573 3574/* 3575 * called under session->mutex. 3576 */ 3577static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3578 struct ceph_mds_session *session) 3579{ 3580 struct ceph_mds_request *req, *nreq; 3581 struct rb_node *p; 3582 3583 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3584 3585 mutex_lock(&mdsc->mutex); 3586 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3587 __send_request(mdsc, session, req, true); 3588 3589 /* 3590 * also re-send old requests when MDS enters reconnect stage. So that MDS 3591 * can process completed request in clientreplay stage. 3592 */ 3593 p = rb_first(&mdsc->request_tree); 3594 while (p) { 3595 req = rb_entry(p, struct ceph_mds_request, r_node); 3596 p = rb_next(p); 3597 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3598 continue; 3599 if (req->r_attempts == 0) 3600 continue; /* only old requests */ 3601 if (!req->r_session) 3602 continue; 3603 if (req->r_session->s_mds != session->s_mds) 3604 continue; 3605 3606 ceph_mdsc_release_dir_caps_no_check(req); 3607 3608 __send_request(mdsc, session, req, true); 3609 } 3610 mutex_unlock(&mdsc->mutex); 3611} 3612 3613static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3614{ 3615 struct ceph_msg *reply; 3616 struct ceph_pagelist *_pagelist; 3617 struct page *page; 3618 __le32 *addr; 3619 int err = -ENOMEM; 3620 3621 if (!recon_state->allow_multi) 3622 return -ENOSPC; 3623 3624 /* can't handle message that contains both caps and realm */ 3625 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3626 3627 /* pre-allocate new pagelist */ 3628 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3629 if (!_pagelist) 3630 return -ENOMEM; 3631 3632 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3633 if (!reply) 3634 goto fail_msg; 3635 3636 /* placeholder for nr_caps */ 3637 err = ceph_pagelist_encode_32(_pagelist, 0); 3638 if (err < 0) 3639 goto fail; 3640 3641 if (recon_state->nr_caps) { 3642 /* currently encoding caps */ 3643 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3644 if (err) 3645 goto fail; 3646 } else { 3647 /* placeholder for nr_realms (currently encoding relams) */ 3648 err = ceph_pagelist_encode_32(_pagelist, 0); 3649 if (err < 0) 3650 goto fail; 3651 } 3652 3653 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3654 if (err) 3655 goto fail; 3656 3657 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3658 addr = kmap_atomic(page); 3659 if (recon_state->nr_caps) { 3660 /* currently encoding caps */ 3661 *addr = cpu_to_le32(recon_state->nr_caps); 3662 } else { 3663 /* currently encoding relams */ 3664 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3665 } 3666 kunmap_atomic(addr); 3667 3668 reply->hdr.version = cpu_to_le16(5); 3669 reply->hdr.compat_version = cpu_to_le16(4); 3670 3671 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3672 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3673 3674 ceph_con_send(&recon_state->session->s_con, reply); 3675 ceph_pagelist_release(recon_state->pagelist); 3676 3677 recon_state->pagelist = _pagelist; 3678 recon_state->nr_caps = 0; 3679 recon_state->nr_realms = 0; 3680 recon_state->msg_version = 5; 3681 return 0; 3682fail: 3683 ceph_msg_put(reply); 3684fail_msg: 3685 ceph_pagelist_release(_pagelist); 3686 return err; 3687} 3688 3689static struct dentry* d_find_primary(struct inode *inode) 3690{ 3691 struct dentry *alias, *dn = NULL; 3692 3693 if (hlist_empty(&inode->i_dentry)) 3694 return NULL; 3695 3696 spin_lock(&inode->i_lock); 3697 if (hlist_empty(&inode->i_dentry)) 3698 goto out_unlock; 3699 3700 if (S_ISDIR(inode->i_mode)) { 3701 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3702 if (!IS_ROOT(alias)) 3703 dn = dget(alias); 3704 goto out_unlock; 3705 } 3706 3707 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3708 spin_lock(&alias->d_lock); 3709 if (!d_unhashed(alias) && 3710 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3711 dn = dget_dlock(alias); 3712 } 3713 spin_unlock(&alias->d_lock); 3714 if (dn) 3715 break; 3716 } 3717out_unlock: 3718 spin_unlock(&inode->i_lock); 3719 return dn; 3720} 3721 3722/* 3723 * Encode information about a cap for a reconnect with the MDS. 3724 */ 3725static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3726 void *arg) 3727{ 3728 union { 3729 struct ceph_mds_cap_reconnect v2; 3730 struct ceph_mds_cap_reconnect_v1 v1; 3731 } rec; 3732 struct ceph_inode_info *ci = cap->ci; 3733 struct ceph_reconnect_state *recon_state = arg; 3734 struct ceph_pagelist *pagelist = recon_state->pagelist; 3735 struct dentry *dentry; 3736 char *path; 3737 int pathlen = 0, err; 3738 u64 pathbase; 3739 u64 snap_follows; 3740 3741 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3742 inode, ceph_vinop(inode), cap, cap->cap_id, 3743 ceph_cap_string(cap->issued)); 3744 3745 dentry = d_find_primary(inode); 3746 if (dentry) { 3747 /* set pathbase to parent dir when msg_version >= 2 */ 3748 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3749 recon_state->msg_version >= 2); 3750 dput(dentry); 3751 if (IS_ERR(path)) { 3752 err = PTR_ERR(path); 3753 goto out_err; 3754 } 3755 } else { 3756 path = NULL; 3757 pathbase = 0; 3758 } 3759 3760 spin_lock(&ci->i_ceph_lock); 3761 cap->seq = 0; /* reset cap seq */ 3762 cap->issue_seq = 0; /* and issue_seq */ 3763 cap->mseq = 0; /* and migrate_seq */ 3764 cap->cap_gen = cap->session->s_cap_gen; 3765 3766 /* These are lost when the session goes away */ 3767 if (S_ISDIR(inode->i_mode)) { 3768 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3769 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3770 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3771 } 3772 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3773 } 3774 3775 if (recon_state->msg_version >= 2) { 3776 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3777 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3778 rec.v2.issued = cpu_to_le32(cap->issued); 3779 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3780 rec.v2.pathbase = cpu_to_le64(pathbase); 3781 rec.v2.flock_len = (__force __le32) 3782 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3783 } else { 3784 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3785 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3786 rec.v1.issued = cpu_to_le32(cap->issued); 3787 rec.v1.size = cpu_to_le64(inode->i_size); 3788 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3789 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3790 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3791 rec.v1.pathbase = cpu_to_le64(pathbase); 3792 } 3793 3794 if (list_empty(&ci->i_cap_snaps)) { 3795 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3796 } else { 3797 struct ceph_cap_snap *capsnap = 3798 list_first_entry(&ci->i_cap_snaps, 3799 struct ceph_cap_snap, ci_item); 3800 snap_follows = capsnap->follows; 3801 } 3802 spin_unlock(&ci->i_ceph_lock); 3803 3804 if (recon_state->msg_version >= 2) { 3805 int num_fcntl_locks, num_flock_locks; 3806 struct ceph_filelock *flocks = NULL; 3807 size_t struct_len, total_len = sizeof(u64); 3808 u8 struct_v = 0; 3809 3810encode_again: 3811 if (rec.v2.flock_len) { 3812 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3813 } else { 3814 num_fcntl_locks = 0; 3815 num_flock_locks = 0; 3816 } 3817 if (num_fcntl_locks + num_flock_locks > 0) { 3818 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3819 sizeof(struct ceph_filelock), 3820 GFP_NOFS); 3821 if (!flocks) { 3822 err = -ENOMEM; 3823 goto out_err; 3824 } 3825 err = ceph_encode_locks_to_buffer(inode, flocks, 3826 num_fcntl_locks, 3827 num_flock_locks); 3828 if (err) { 3829 kfree(flocks); 3830 flocks = NULL; 3831 if (err == -ENOSPC) 3832 goto encode_again; 3833 goto out_err; 3834 } 3835 } else { 3836 kfree(flocks); 3837 flocks = NULL; 3838 } 3839 3840 if (recon_state->msg_version >= 3) { 3841 /* version, compat_version and struct_len */ 3842 total_len += 2 * sizeof(u8) + sizeof(u32); 3843 struct_v = 2; 3844 } 3845 /* 3846 * number of encoded locks is stable, so copy to pagelist 3847 */ 3848 struct_len = 2 * sizeof(u32) + 3849 (num_fcntl_locks + num_flock_locks) * 3850 sizeof(struct ceph_filelock); 3851 rec.v2.flock_len = cpu_to_le32(struct_len); 3852 3853 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3854 3855 if (struct_v >= 2) 3856 struct_len += sizeof(u64); /* snap_follows */ 3857 3858 total_len += struct_len; 3859 3860 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3861 err = send_reconnect_partial(recon_state); 3862 if (err) 3863 goto out_freeflocks; 3864 pagelist = recon_state->pagelist; 3865 } 3866 3867 err = ceph_pagelist_reserve(pagelist, total_len); 3868 if (err) 3869 goto out_freeflocks; 3870 3871 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3872 if (recon_state->msg_version >= 3) { 3873 ceph_pagelist_encode_8(pagelist, struct_v); 3874 ceph_pagelist_encode_8(pagelist, 1); 3875 ceph_pagelist_encode_32(pagelist, struct_len); 3876 } 3877 ceph_pagelist_encode_string(pagelist, path, pathlen); 3878 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3879 ceph_locks_to_pagelist(flocks, pagelist, 3880 num_fcntl_locks, num_flock_locks); 3881 if (struct_v >= 2) 3882 ceph_pagelist_encode_64(pagelist, snap_follows); 3883out_freeflocks: 3884 kfree(flocks); 3885 } else { 3886 err = ceph_pagelist_reserve(pagelist, 3887 sizeof(u64) + sizeof(u32) + 3888 pathlen + sizeof(rec.v1)); 3889 if (err) 3890 goto out_err; 3891 3892 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3893 ceph_pagelist_encode_string(pagelist, path, pathlen); 3894 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3895 } 3896 3897out_err: 3898 ceph_mdsc_free_path(path, pathlen); 3899 if (!err) 3900 recon_state->nr_caps++; 3901 return err; 3902} 3903 3904static int encode_snap_realms(struct ceph_mds_client *mdsc, 3905 struct ceph_reconnect_state *recon_state) 3906{ 3907 struct rb_node *p; 3908 struct ceph_pagelist *pagelist = recon_state->pagelist; 3909 int err = 0; 3910 3911 if (recon_state->msg_version >= 4) { 3912 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3913 if (err < 0) 3914 goto fail; 3915 } 3916 3917 /* 3918 * snaprealms. we provide mds with the ino, seq (version), and 3919 * parent for all of our realms. If the mds has any newer info, 3920 * it will tell us. 3921 */ 3922 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3923 struct ceph_snap_realm *realm = 3924 rb_entry(p, struct ceph_snap_realm, node); 3925 struct ceph_mds_snaprealm_reconnect sr_rec; 3926 3927 if (recon_state->msg_version >= 4) { 3928 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3929 sizeof(sr_rec); 3930 3931 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3932 err = send_reconnect_partial(recon_state); 3933 if (err) 3934 goto fail; 3935 pagelist = recon_state->pagelist; 3936 } 3937 3938 err = ceph_pagelist_reserve(pagelist, need); 3939 if (err) 3940 goto fail; 3941 3942 ceph_pagelist_encode_8(pagelist, 1); 3943 ceph_pagelist_encode_8(pagelist, 1); 3944 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3945 } 3946 3947 dout(" adding snap realm %llx seq %lld parent %llx\n", 3948 realm->ino, realm->seq, realm->parent_ino); 3949 sr_rec.ino = cpu_to_le64(realm->ino); 3950 sr_rec.seq = cpu_to_le64(realm->seq); 3951 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3952 3953 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3954 if (err) 3955 goto fail; 3956 3957 recon_state->nr_realms++; 3958 } 3959fail: 3960 return err; 3961} 3962 3963 3964/* 3965 * If an MDS fails and recovers, clients need to reconnect in order to 3966 * reestablish shared state. This includes all caps issued through 3967 * this session _and_ the snap_realm hierarchy. Because it's not 3968 * clear which snap realms the mds cares about, we send everything we 3969 * know about.. that ensures we'll then get any new info the 3970 * recovering MDS might have. 3971 * 3972 * This is a relatively heavyweight operation, but it's rare. 3973 */ 3974static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3975 struct ceph_mds_session *session) 3976{ 3977 struct ceph_msg *reply; 3978 int mds = session->s_mds; 3979 int err = -ENOMEM; 3980 struct ceph_reconnect_state recon_state = { 3981 .session = session, 3982 }; 3983 LIST_HEAD(dispose); 3984 3985 pr_info("mds%d reconnect start\n", mds); 3986 3987 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3988 if (!recon_state.pagelist) 3989 goto fail_nopagelist; 3990 3991 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3992 if (!reply) 3993 goto fail_nomsg; 3994 3995 xa_destroy(&session->s_delegated_inos); 3996 3997 mutex_lock(&session->s_mutex); 3998 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3999 session->s_seq = 0; 4000 4001 dout("session %p state %s\n", session, 4002 ceph_session_state_name(session->s_state)); 4003 4004 spin_lock(&session->s_gen_ttl_lock); 4005 session->s_cap_gen++; 4006 spin_unlock(&session->s_gen_ttl_lock); 4007 4008 spin_lock(&session->s_cap_lock); 4009 /* don't know if session is readonly */ 4010 session->s_readonly = 0; 4011 /* 4012 * notify __ceph_remove_cap() that we are composing cap reconnect. 4013 * If a cap get released before being added to the cap reconnect, 4014 * __ceph_remove_cap() should skip queuing cap release. 4015 */ 4016 session->s_cap_reconnect = 1; 4017 /* drop old cap expires; we're about to reestablish that state */ 4018 detach_cap_releases(session, &dispose); 4019 spin_unlock(&session->s_cap_lock); 4020 dispose_cap_releases(mdsc, &dispose); 4021 4022 /* trim unused caps to reduce MDS's cache rejoin time */ 4023 if (mdsc->fsc->sb->s_root) 4024 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4025 4026 ceph_con_close(&session->s_con); 4027 ceph_con_open(&session->s_con, 4028 CEPH_ENTITY_TYPE_MDS, mds, 4029 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4030 4031 /* replay unsafe requests */ 4032 replay_unsafe_requests(mdsc, session); 4033 4034 ceph_early_kick_flushing_caps(mdsc, session); 4035 4036 down_read(&mdsc->snap_rwsem); 4037 4038 /* placeholder for nr_caps */ 4039 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4040 if (err) 4041 goto fail; 4042 4043 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4044 recon_state.msg_version = 3; 4045 recon_state.allow_multi = true; 4046 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4047 recon_state.msg_version = 3; 4048 } else { 4049 recon_state.msg_version = 2; 4050 } 4051 /* trsaverse this session's caps */ 4052 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4053 4054 spin_lock(&session->s_cap_lock); 4055 session->s_cap_reconnect = 0; 4056 spin_unlock(&session->s_cap_lock); 4057 4058 if (err < 0) 4059 goto fail; 4060 4061 /* check if all realms can be encoded into current message */ 4062 if (mdsc->num_snap_realms) { 4063 size_t total_len = 4064 recon_state.pagelist->length + 4065 mdsc->num_snap_realms * 4066 sizeof(struct ceph_mds_snaprealm_reconnect); 4067 if (recon_state.msg_version >= 4) { 4068 /* number of realms */ 4069 total_len += sizeof(u32); 4070 /* version, compat_version and struct_len */ 4071 total_len += mdsc->num_snap_realms * 4072 (2 * sizeof(u8) + sizeof(u32)); 4073 } 4074 if (total_len > RECONNECT_MAX_SIZE) { 4075 if (!recon_state.allow_multi) { 4076 err = -ENOSPC; 4077 goto fail; 4078 } 4079 if (recon_state.nr_caps) { 4080 err = send_reconnect_partial(&recon_state); 4081 if (err) 4082 goto fail; 4083 } 4084 recon_state.msg_version = 5; 4085 } 4086 } 4087 4088 err = encode_snap_realms(mdsc, &recon_state); 4089 if (err < 0) 4090 goto fail; 4091 4092 if (recon_state.msg_version >= 5) { 4093 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4094 if (err < 0) 4095 goto fail; 4096 } 4097 4098 if (recon_state.nr_caps || recon_state.nr_realms) { 4099 struct page *page = 4100 list_first_entry(&recon_state.pagelist->head, 4101 struct page, lru); 4102 __le32 *addr = kmap_atomic(page); 4103 if (recon_state.nr_caps) { 4104 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4105 *addr = cpu_to_le32(recon_state.nr_caps); 4106 } else if (recon_state.msg_version >= 4) { 4107 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4108 } 4109 kunmap_atomic(addr); 4110 } 4111 4112 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4113 if (recon_state.msg_version >= 4) 4114 reply->hdr.compat_version = cpu_to_le16(4); 4115 4116 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4117 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4118 4119 ceph_con_send(&session->s_con, reply); 4120 4121 mutex_unlock(&session->s_mutex); 4122 4123 mutex_lock(&mdsc->mutex); 4124 __wake_requests(mdsc, &session->s_waiting); 4125 mutex_unlock(&mdsc->mutex); 4126 4127 up_read(&mdsc->snap_rwsem); 4128 ceph_pagelist_release(recon_state.pagelist); 4129 return; 4130 4131fail: 4132 ceph_msg_put(reply); 4133 up_read(&mdsc->snap_rwsem); 4134 mutex_unlock(&session->s_mutex); 4135fail_nomsg: 4136 ceph_pagelist_release(recon_state.pagelist); 4137fail_nopagelist: 4138 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4139 return; 4140} 4141 4142 4143/* 4144 * compare old and new mdsmaps, kicking requests 4145 * and closing out old connections as necessary 4146 * 4147 * called under mdsc->mutex. 4148 */ 4149static void check_new_map(struct ceph_mds_client *mdsc, 4150 struct ceph_mdsmap *newmap, 4151 struct ceph_mdsmap *oldmap) 4152{ 4153 int i; 4154 int oldstate, newstate; 4155 struct ceph_mds_session *s; 4156 4157 dout("check_new_map new %u old %u\n", 4158 newmap->m_epoch, oldmap->m_epoch); 4159 4160 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4161 if (!mdsc->sessions[i]) 4162 continue; 4163 s = mdsc->sessions[i]; 4164 oldstate = ceph_mdsmap_get_state(oldmap, i); 4165 newstate = ceph_mdsmap_get_state(newmap, i); 4166 4167 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4168 i, ceph_mds_state_name(oldstate), 4169 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4170 ceph_mds_state_name(newstate), 4171 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4172 ceph_session_state_name(s->s_state)); 4173 4174 if (i >= newmap->possible_max_rank) { 4175 /* force close session for stopped mds */ 4176 ceph_get_mds_session(s); 4177 __unregister_session(mdsc, s); 4178 __wake_requests(mdsc, &s->s_waiting); 4179 mutex_unlock(&mdsc->mutex); 4180 4181 mutex_lock(&s->s_mutex); 4182 cleanup_session_requests(mdsc, s); 4183 remove_session_caps(s); 4184 mutex_unlock(&s->s_mutex); 4185 4186 ceph_put_mds_session(s); 4187 4188 mutex_lock(&mdsc->mutex); 4189 kick_requests(mdsc, i); 4190 continue; 4191 } 4192 4193 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4194 ceph_mdsmap_get_addr(newmap, i), 4195 sizeof(struct ceph_entity_addr))) { 4196 /* just close it */ 4197 mutex_unlock(&mdsc->mutex); 4198 mutex_lock(&s->s_mutex); 4199 mutex_lock(&mdsc->mutex); 4200 ceph_con_close(&s->s_con); 4201 mutex_unlock(&s->s_mutex); 4202 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4203 } else if (oldstate == newstate) { 4204 continue; /* nothing new with this mds */ 4205 } 4206 4207 /* 4208 * send reconnect? 4209 */ 4210 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4211 newstate >= CEPH_MDS_STATE_RECONNECT) { 4212 mutex_unlock(&mdsc->mutex); 4213 send_mds_reconnect(mdsc, s); 4214 mutex_lock(&mdsc->mutex); 4215 } 4216 4217 /* 4218 * kick request on any mds that has gone active. 4219 */ 4220 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4221 newstate >= CEPH_MDS_STATE_ACTIVE) { 4222 if (oldstate != CEPH_MDS_STATE_CREATING && 4223 oldstate != CEPH_MDS_STATE_STARTING) 4224 pr_info("mds%d recovery completed\n", s->s_mds); 4225 kick_requests(mdsc, i); 4226 mutex_unlock(&mdsc->mutex); 4227 mutex_lock(&s->s_mutex); 4228 mutex_lock(&mdsc->mutex); 4229 ceph_kick_flushing_caps(mdsc, s); 4230 mutex_unlock(&s->s_mutex); 4231 wake_up_session_caps(s, RECONNECT); 4232 } 4233 } 4234 4235 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4236 s = mdsc->sessions[i]; 4237 if (!s) 4238 continue; 4239 if (!ceph_mdsmap_is_laggy(newmap, i)) 4240 continue; 4241 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4242 s->s_state == CEPH_MDS_SESSION_HUNG || 4243 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4244 dout(" connecting to export targets of laggy mds%d\n", 4245 i); 4246 __open_export_target_sessions(mdsc, s); 4247 } 4248 } 4249} 4250 4251 4252 4253/* 4254 * leases 4255 */ 4256 4257/* 4258 * caller must hold session s_mutex, dentry->d_lock 4259 */ 4260void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4261{ 4262 struct ceph_dentry_info *di = ceph_dentry(dentry); 4263 4264 ceph_put_mds_session(di->lease_session); 4265 di->lease_session = NULL; 4266} 4267 4268static void handle_lease(struct ceph_mds_client *mdsc, 4269 struct ceph_mds_session *session, 4270 struct ceph_msg *msg) 4271{ 4272 struct super_block *sb = mdsc->fsc->sb; 4273 struct inode *inode; 4274 struct dentry *parent, *dentry; 4275 struct ceph_dentry_info *di; 4276 int mds = session->s_mds; 4277 struct ceph_mds_lease *h = msg->front.iov_base; 4278 u32 seq; 4279 struct ceph_vino vino; 4280 struct qstr dname; 4281 int release = 0; 4282 4283 dout("handle_lease from mds%d\n", mds); 4284 4285 /* decode */ 4286 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4287 goto bad; 4288 vino.ino = le64_to_cpu(h->ino); 4289 vino.snap = CEPH_NOSNAP; 4290 seq = le32_to_cpu(h->seq); 4291 dname.len = get_unaligned_le32(h + 1); 4292 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4293 goto bad; 4294 dname.name = (void *)(h + 1) + sizeof(u32); 4295 4296 /* lookup inode */ 4297 inode = ceph_find_inode(sb, vino); 4298 dout("handle_lease %s, ino %llx %p %.*s\n", 4299 ceph_lease_op_name(h->action), vino.ino, inode, 4300 dname.len, dname.name); 4301 4302 mutex_lock(&session->s_mutex); 4303 inc_session_sequence(session); 4304 4305 if (!inode) { 4306 dout("handle_lease no inode %llx\n", vino.ino); 4307 goto release; 4308 } 4309 4310 /* dentry */ 4311 parent = d_find_alias(inode); 4312 if (!parent) { 4313 dout("no parent dentry on inode %p\n", inode); 4314 WARN_ON(1); 4315 goto release; /* hrm... */ 4316 } 4317 dname.hash = full_name_hash(parent, dname.name, dname.len); 4318 dentry = d_lookup(parent, &dname); 4319 dput(parent); 4320 if (!dentry) 4321 goto release; 4322 4323 spin_lock(&dentry->d_lock); 4324 di = ceph_dentry(dentry); 4325 switch (h->action) { 4326 case CEPH_MDS_LEASE_REVOKE: 4327 if (di->lease_session == session) { 4328 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4329 h->seq = cpu_to_le32(di->lease_seq); 4330 __ceph_mdsc_drop_dentry_lease(dentry); 4331 } 4332 release = 1; 4333 break; 4334 4335 case CEPH_MDS_LEASE_RENEW: 4336 if (di->lease_session == session && 4337 di->lease_gen == session->s_cap_gen && 4338 di->lease_renew_from && 4339 di->lease_renew_after == 0) { 4340 unsigned long duration = 4341 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4342 4343 di->lease_seq = seq; 4344 di->time = di->lease_renew_from + duration; 4345 di->lease_renew_after = di->lease_renew_from + 4346 (duration >> 1); 4347 di->lease_renew_from = 0; 4348 } 4349 break; 4350 } 4351 spin_unlock(&dentry->d_lock); 4352 dput(dentry); 4353 4354 if (!release) 4355 goto out; 4356 4357release: 4358 /* let's just reuse the same message */ 4359 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4360 ceph_msg_get(msg); 4361 ceph_con_send(&session->s_con, msg); 4362 4363out: 4364 mutex_unlock(&session->s_mutex); 4365 /* avoid calling iput_final() in mds dispatch threads */ 4366 ceph_async_iput(inode); 4367 return; 4368 4369bad: 4370 pr_err("corrupt lease message\n"); 4371 ceph_msg_dump(msg); 4372} 4373 4374void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4375 struct dentry *dentry, char action, 4376 u32 seq) 4377{ 4378 struct ceph_msg *msg; 4379 struct ceph_mds_lease *lease; 4380 struct inode *dir; 4381 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4382 4383 dout("lease_send_msg identry %p %s to mds%d\n", 4384 dentry, ceph_lease_op_name(action), session->s_mds); 4385 4386 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4387 if (!msg) 4388 return; 4389 lease = msg->front.iov_base; 4390 lease->action = action; 4391 lease->seq = cpu_to_le32(seq); 4392 4393 spin_lock(&dentry->d_lock); 4394 dir = d_inode(dentry->d_parent); 4395 lease->ino = cpu_to_le64(ceph_ino(dir)); 4396 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4397 4398 put_unaligned_le32(dentry->d_name.len, lease + 1); 4399 memcpy((void *)(lease + 1) + 4, 4400 dentry->d_name.name, dentry->d_name.len); 4401 spin_unlock(&dentry->d_lock); 4402 /* 4403 * if this is a preemptive lease RELEASE, no need to 4404 * flush request stream, since the actual request will 4405 * soon follow. 4406 */ 4407 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4408 4409 ceph_con_send(&session->s_con, msg); 4410} 4411 4412/* 4413 * lock unlock the session, to wait ongoing session activities 4414 */ 4415static void lock_unlock_session(struct ceph_mds_session *s) 4416{ 4417 mutex_lock(&s->s_mutex); 4418 mutex_unlock(&s->s_mutex); 4419} 4420 4421static void maybe_recover_session(struct ceph_mds_client *mdsc) 4422{ 4423 struct ceph_fs_client *fsc = mdsc->fsc; 4424 4425 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4426 return; 4427 4428 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4429 return; 4430 4431 if (!READ_ONCE(fsc->blocklisted)) 4432 return; 4433 4434 if (fsc->last_auto_reconnect && 4435 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4436 return; 4437 4438 pr_info("auto reconnect after blocklisted\n"); 4439 fsc->last_auto_reconnect = jiffies; 4440 ceph_force_reconnect(fsc->sb); 4441} 4442 4443bool check_session_state(struct ceph_mds_session *s) 4444{ 4445 switch (s->s_state) { 4446 case CEPH_MDS_SESSION_OPEN: 4447 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4448 s->s_state = CEPH_MDS_SESSION_HUNG; 4449 pr_info("mds%d hung\n", s->s_mds); 4450 } 4451 break; 4452 case CEPH_MDS_SESSION_CLOSING: 4453 /* Should never reach this when we're unmounting */ 4454 WARN_ON_ONCE(s->s_ttl); 4455 fallthrough; 4456 case CEPH_MDS_SESSION_NEW: 4457 case CEPH_MDS_SESSION_RESTARTING: 4458 case CEPH_MDS_SESSION_CLOSED: 4459 case CEPH_MDS_SESSION_REJECTED: 4460 return false; 4461 } 4462 4463 return true; 4464} 4465 4466/* 4467 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4468 * then we need to retransmit that request. 4469 */ 4470void inc_session_sequence(struct ceph_mds_session *s) 4471{ 4472 lockdep_assert_held(&s->s_mutex); 4473 4474 s->s_seq++; 4475 4476 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4477 int ret; 4478 4479 dout("resending session close request for mds%d\n", s->s_mds); 4480 ret = request_close_session(s); 4481 if (ret < 0) 4482 pr_err("unable to close session to mds%d: %d\n", 4483 s->s_mds, ret); 4484 } 4485} 4486 4487/* 4488 * delayed work -- periodically trim expired leases, renew caps with mds. If 4489 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 4490 * workqueue delay value of 5 secs will be used. 4491 */ 4492static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 4493{ 4494 unsigned long max_delay = HZ * 5; 4495 4496 /* 5 secs default delay */ 4497 if (!delay || (delay > max_delay)) 4498 delay = max_delay; 4499 schedule_delayed_work(&mdsc->delayed_work, 4500 round_jiffies_relative(delay)); 4501} 4502 4503static void delayed_work(struct work_struct *work) 4504{ 4505 struct ceph_mds_client *mdsc = 4506 container_of(work, struct ceph_mds_client, delayed_work.work); 4507 unsigned long delay; 4508 int renew_interval; 4509 int renew_caps; 4510 int i; 4511 4512 dout("mdsc delayed_work\n"); 4513 4514 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) 4515 return; 4516 4517 mutex_lock(&mdsc->mutex); 4518 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4519 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4520 mdsc->last_renew_caps); 4521 if (renew_caps) 4522 mdsc->last_renew_caps = jiffies; 4523 4524 for (i = 0; i < mdsc->max_sessions; i++) { 4525 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4526 if (!s) 4527 continue; 4528 4529 if (!check_session_state(s)) { 4530 ceph_put_mds_session(s); 4531 continue; 4532 } 4533 mutex_unlock(&mdsc->mutex); 4534 4535 mutex_lock(&s->s_mutex); 4536 if (renew_caps) 4537 send_renew_caps(mdsc, s); 4538 else 4539 ceph_con_keepalive(&s->s_con); 4540 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4541 s->s_state == CEPH_MDS_SESSION_HUNG) 4542 ceph_send_cap_releases(mdsc, s); 4543 mutex_unlock(&s->s_mutex); 4544 ceph_put_mds_session(s); 4545 4546 mutex_lock(&mdsc->mutex); 4547 } 4548 mutex_unlock(&mdsc->mutex); 4549 4550 delay = ceph_check_delayed_caps(mdsc); 4551 4552 ceph_queue_cap_reclaim_work(mdsc); 4553 4554 ceph_trim_snapid_map(mdsc); 4555 4556 maybe_recover_session(mdsc); 4557 4558 schedule_delayed(mdsc, delay); 4559} 4560 4561int ceph_mdsc_init(struct ceph_fs_client *fsc) 4562 4563{ 4564 struct ceph_mds_client *mdsc; 4565 int err; 4566 4567 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4568 if (!mdsc) 4569 return -ENOMEM; 4570 mdsc->fsc = fsc; 4571 mutex_init(&mdsc->mutex); 4572 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4573 if (!mdsc->mdsmap) { 4574 err = -ENOMEM; 4575 goto err_mdsc; 4576 } 4577 4578 init_completion(&mdsc->safe_umount_waiters); 4579 init_waitqueue_head(&mdsc->session_close_wq); 4580 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4581 mdsc->sessions = NULL; 4582 atomic_set(&mdsc->num_sessions, 0); 4583 mdsc->max_sessions = 0; 4584 mdsc->stopping = 0; 4585 atomic64_set(&mdsc->quotarealms_count, 0); 4586 mdsc->quotarealms_inodes = RB_ROOT; 4587 mutex_init(&mdsc->quotarealms_inodes_mutex); 4588 mdsc->last_snap_seq = 0; 4589 init_rwsem(&mdsc->snap_rwsem); 4590 mdsc->snap_realms = RB_ROOT; 4591 INIT_LIST_HEAD(&mdsc->snap_empty); 4592 mdsc->num_snap_realms = 0; 4593 spin_lock_init(&mdsc->snap_empty_lock); 4594 mdsc->last_tid = 0; 4595 mdsc->oldest_tid = 0; 4596 mdsc->request_tree = RB_ROOT; 4597 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4598 mdsc->last_renew_caps = jiffies; 4599 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4600 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4601 spin_lock_init(&mdsc->cap_delay_lock); 4602 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4603 spin_lock_init(&mdsc->snap_flush_lock); 4604 mdsc->last_cap_flush_tid = 1; 4605 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4606 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4607 mdsc->num_cap_flushing = 0; 4608 spin_lock_init(&mdsc->cap_dirty_lock); 4609 init_waitqueue_head(&mdsc->cap_flushing_wq); 4610 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4611 atomic_set(&mdsc->cap_reclaim_pending, 0); 4612 err = ceph_metric_init(&mdsc->metric); 4613 if (err) 4614 goto err_mdsmap; 4615 4616 spin_lock_init(&mdsc->dentry_list_lock); 4617 INIT_LIST_HEAD(&mdsc->dentry_leases); 4618 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4619 4620 ceph_caps_init(mdsc); 4621 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4622 4623 spin_lock_init(&mdsc->snapid_map_lock); 4624 mdsc->snapid_map_tree = RB_ROOT; 4625 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4626 4627 init_rwsem(&mdsc->pool_perm_rwsem); 4628 mdsc->pool_perm_tree = RB_ROOT; 4629 4630 strscpy(mdsc->nodename, utsname()->nodename, 4631 sizeof(mdsc->nodename)); 4632 4633 fsc->mdsc = mdsc; 4634 return 0; 4635 4636err_mdsmap: 4637 kfree(mdsc->mdsmap); 4638err_mdsc: 4639 kfree(mdsc); 4640 return err; 4641} 4642 4643/* 4644 * Wait for safe replies on open mds requests. If we time out, drop 4645 * all requests from the tree to avoid dangling dentry refs. 4646 */ 4647static void wait_requests(struct ceph_mds_client *mdsc) 4648{ 4649 struct ceph_options *opts = mdsc->fsc->client->options; 4650 struct ceph_mds_request *req; 4651 4652 mutex_lock(&mdsc->mutex); 4653 if (__get_oldest_req(mdsc)) { 4654 mutex_unlock(&mdsc->mutex); 4655 4656 dout("wait_requests waiting for requests\n"); 4657 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4658 ceph_timeout_jiffies(opts->mount_timeout)); 4659 4660 /* tear down remaining requests */ 4661 mutex_lock(&mdsc->mutex); 4662 while ((req = __get_oldest_req(mdsc))) { 4663 dout("wait_requests timed out on tid %llu\n", 4664 req->r_tid); 4665 list_del_init(&req->r_wait); 4666 __unregister_request(mdsc, req); 4667 } 4668 } 4669 mutex_unlock(&mdsc->mutex); 4670 dout("wait_requests done\n"); 4671} 4672 4673void send_flush_mdlog(struct ceph_mds_session *s) 4674{ 4675 struct ceph_msg *msg; 4676 4677 /* 4678 * Pre-luminous MDS crashes when it sees an unknown session request 4679 */ 4680 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 4681 return; 4682 4683 mutex_lock(&s->s_mutex); 4684 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds, 4685 ceph_session_state_name(s->s_state), s->s_seq); 4686 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 4687 s->s_seq); 4688 if (!msg) { 4689 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n", 4690 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 4691 } else { 4692 ceph_con_send(&s->s_con, msg); 4693 } 4694 mutex_unlock(&s->s_mutex); 4695} 4696 4697/* 4698 * called before mount is ro, and before dentries are torn down. 4699 * (hmm, does this still race with new lookups?) 4700 */ 4701void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4702{ 4703 dout("pre_umount\n"); 4704 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN; 4705 4706 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 4707 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 4708 ceph_flush_dirty_caps(mdsc); 4709 wait_requests(mdsc); 4710 4711 /* 4712 * wait for reply handlers to drop their request refs and 4713 * their inode/dcache refs 4714 */ 4715 ceph_msgr_flush(); 4716 4717 ceph_cleanup_quotarealms_inodes(mdsc); 4718} 4719 4720/* 4721 * wait for all write mds requests to flush. 4722 */ 4723static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4724{ 4725 struct ceph_mds_request *req = NULL, *nextreq; 4726 struct rb_node *n; 4727 4728 mutex_lock(&mdsc->mutex); 4729 dout("wait_unsafe_requests want %lld\n", want_tid); 4730restart: 4731 req = __get_oldest_req(mdsc); 4732 while (req && req->r_tid <= want_tid) { 4733 /* find next request */ 4734 n = rb_next(&req->r_node); 4735 if (n) 4736 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4737 else 4738 nextreq = NULL; 4739 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4740 (req->r_op & CEPH_MDS_OP_WRITE)) { 4741 /* write op */ 4742 ceph_mdsc_get_request(req); 4743 if (nextreq) 4744 ceph_mdsc_get_request(nextreq); 4745 mutex_unlock(&mdsc->mutex); 4746 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4747 req->r_tid, want_tid); 4748 wait_for_completion(&req->r_safe_completion); 4749 mutex_lock(&mdsc->mutex); 4750 ceph_mdsc_put_request(req); 4751 if (!nextreq) 4752 break; /* next dne before, so we're done! */ 4753 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4754 /* next request was removed from tree */ 4755 ceph_mdsc_put_request(nextreq); 4756 goto restart; 4757 } 4758 ceph_mdsc_put_request(nextreq); /* won't go away */ 4759 } 4760 req = nextreq; 4761 } 4762 mutex_unlock(&mdsc->mutex); 4763 dout("wait_unsafe_requests done\n"); 4764} 4765 4766void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4767{ 4768 u64 want_tid, want_flush; 4769 4770 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4771 return; 4772 4773 dout("sync\n"); 4774 mutex_lock(&mdsc->mutex); 4775 want_tid = mdsc->last_tid; 4776 mutex_unlock(&mdsc->mutex); 4777 4778 ceph_flush_dirty_caps(mdsc); 4779 spin_lock(&mdsc->cap_dirty_lock); 4780 want_flush = mdsc->last_cap_flush_tid; 4781 if (!list_empty(&mdsc->cap_flush_list)) { 4782 struct ceph_cap_flush *cf = 4783 list_last_entry(&mdsc->cap_flush_list, 4784 struct ceph_cap_flush, g_list); 4785 cf->wake = true; 4786 } 4787 spin_unlock(&mdsc->cap_dirty_lock); 4788 4789 dout("sync want tid %lld flush_seq %lld\n", 4790 want_tid, want_flush); 4791 4792 wait_unsafe_requests(mdsc, want_tid); 4793 wait_caps_flush(mdsc, want_flush); 4794} 4795 4796/* 4797 * true if all sessions are closed, or we force unmount 4798 */ 4799static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4800{ 4801 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4802 return true; 4803 return atomic_read(&mdsc->num_sessions) <= skipped; 4804} 4805 4806/* 4807 * called after sb is ro. 4808 */ 4809void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4810{ 4811 struct ceph_options *opts = mdsc->fsc->client->options; 4812 struct ceph_mds_session *session; 4813 int i; 4814 int skipped = 0; 4815 4816 dout("close_sessions\n"); 4817 4818 /* close sessions */ 4819 mutex_lock(&mdsc->mutex); 4820 for (i = 0; i < mdsc->max_sessions; i++) { 4821 session = __ceph_lookup_mds_session(mdsc, i); 4822 if (!session) 4823 continue; 4824 mutex_unlock(&mdsc->mutex); 4825 mutex_lock(&session->s_mutex); 4826 if (__close_session(mdsc, session) <= 0) 4827 skipped++; 4828 mutex_unlock(&session->s_mutex); 4829 ceph_put_mds_session(session); 4830 mutex_lock(&mdsc->mutex); 4831 } 4832 mutex_unlock(&mdsc->mutex); 4833 4834 dout("waiting for sessions to close\n"); 4835 wait_event_timeout(mdsc->session_close_wq, 4836 done_closing_sessions(mdsc, skipped), 4837 ceph_timeout_jiffies(opts->mount_timeout)); 4838 4839 /* tear down remaining sessions */ 4840 mutex_lock(&mdsc->mutex); 4841 for (i = 0; i < mdsc->max_sessions; i++) { 4842 if (mdsc->sessions[i]) { 4843 session = ceph_get_mds_session(mdsc->sessions[i]); 4844 __unregister_session(mdsc, session); 4845 mutex_unlock(&mdsc->mutex); 4846 mutex_lock(&session->s_mutex); 4847 remove_session_caps(session); 4848 mutex_unlock(&session->s_mutex); 4849 ceph_put_mds_session(session); 4850 mutex_lock(&mdsc->mutex); 4851 } 4852 } 4853 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4854 mutex_unlock(&mdsc->mutex); 4855 4856 ceph_cleanup_snapid_map(mdsc); 4857 ceph_cleanup_empty_realms(mdsc); 4858 4859 cancel_work_sync(&mdsc->cap_reclaim_work); 4860 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4861 4862 dout("stopped\n"); 4863} 4864 4865void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4866{ 4867 struct ceph_mds_session *session; 4868 int mds; 4869 4870 dout("force umount\n"); 4871 4872 mutex_lock(&mdsc->mutex); 4873 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4874 session = __ceph_lookup_mds_session(mdsc, mds); 4875 if (!session) 4876 continue; 4877 4878 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4879 __unregister_session(mdsc, session); 4880 __wake_requests(mdsc, &session->s_waiting); 4881 mutex_unlock(&mdsc->mutex); 4882 4883 mutex_lock(&session->s_mutex); 4884 __close_session(mdsc, session); 4885 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4886 cleanup_session_requests(mdsc, session); 4887 remove_session_caps(session); 4888 } 4889 mutex_unlock(&session->s_mutex); 4890 ceph_put_mds_session(session); 4891 4892 mutex_lock(&mdsc->mutex); 4893 kick_requests(mdsc, mds); 4894 } 4895 __wake_requests(mdsc, &mdsc->waiting_for_map); 4896 mutex_unlock(&mdsc->mutex); 4897} 4898 4899static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4900{ 4901 dout("stop\n"); 4902 /* 4903 * Make sure the delayed work stopped before releasing 4904 * the resources. 4905 * 4906 * Because the cancel_delayed_work_sync() will only 4907 * guarantee that the work finishes executing. But the 4908 * delayed work will re-arm itself again after that. 4909 */ 4910 flush_delayed_work(&mdsc->delayed_work); 4911 4912 if (mdsc->mdsmap) 4913 ceph_mdsmap_destroy(mdsc->mdsmap); 4914 kfree(mdsc->sessions); 4915 ceph_caps_finalize(mdsc); 4916 ceph_pool_perm_destroy(mdsc); 4917} 4918 4919void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4920{ 4921 struct ceph_mds_client *mdsc = fsc->mdsc; 4922 dout("mdsc_destroy %p\n", mdsc); 4923 4924 if (!mdsc) 4925 return; 4926 4927 /* flush out any connection work with references to us */ 4928 ceph_msgr_flush(); 4929 4930 ceph_mdsc_stop(mdsc); 4931 4932 ceph_metric_destroy(&mdsc->metric); 4933 4934 fsc->mdsc = NULL; 4935 kfree(mdsc); 4936 dout("mdsc_destroy %p done\n", mdsc); 4937} 4938 4939void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4940{ 4941 struct ceph_fs_client *fsc = mdsc->fsc; 4942 const char *mds_namespace = fsc->mount_options->mds_namespace; 4943 void *p = msg->front.iov_base; 4944 void *end = p + msg->front.iov_len; 4945 u32 epoch; 4946 u32 map_len; 4947 u32 num_fs; 4948 u32 mount_fscid = (u32)-1; 4949 u8 struct_v, struct_cv; 4950 int err = -EINVAL; 4951 4952 ceph_decode_need(&p, end, sizeof(u32), bad); 4953 epoch = ceph_decode_32(&p); 4954 4955 dout("handle_fsmap epoch %u\n", epoch); 4956 4957 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4958 struct_v = ceph_decode_8(&p); 4959 struct_cv = ceph_decode_8(&p); 4960 map_len = ceph_decode_32(&p); 4961 4962 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4963 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4964 4965 num_fs = ceph_decode_32(&p); 4966 while (num_fs-- > 0) { 4967 void *info_p, *info_end; 4968 u32 info_len; 4969 u8 info_v, info_cv; 4970 u32 fscid, namelen; 4971 4972 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4973 info_v = ceph_decode_8(&p); 4974 info_cv = ceph_decode_8(&p); 4975 info_len = ceph_decode_32(&p); 4976 ceph_decode_need(&p, end, info_len, bad); 4977 info_p = p; 4978 info_end = p + info_len; 4979 p = info_end; 4980 4981 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4982 fscid = ceph_decode_32(&info_p); 4983 namelen = ceph_decode_32(&info_p); 4984 ceph_decode_need(&info_p, info_end, namelen, bad); 4985 4986 if (mds_namespace && 4987 strlen(mds_namespace) == namelen && 4988 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4989 mount_fscid = fscid; 4990 break; 4991 } 4992 } 4993 4994 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4995 if (mount_fscid != (u32)-1) { 4996 fsc->client->monc.fs_cluster_id = mount_fscid; 4997 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4998 0, true); 4999 ceph_monc_renew_subs(&fsc->client->monc); 5000 } else { 5001 err = -ENOENT; 5002 goto err_out; 5003 } 5004 return; 5005 5006bad: 5007 pr_err("error decoding fsmap\n"); 5008err_out: 5009 mutex_lock(&mdsc->mutex); 5010 mdsc->mdsmap_err = err; 5011 __wake_requests(mdsc, &mdsc->waiting_for_map); 5012 mutex_unlock(&mdsc->mutex); 5013} 5014 5015/* 5016 * handle mds map update. 5017 */ 5018void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5019{ 5020 u32 epoch; 5021 u32 maplen; 5022 void *p = msg->front.iov_base; 5023 void *end = p + msg->front.iov_len; 5024 struct ceph_mdsmap *newmap, *oldmap; 5025 struct ceph_fsid fsid; 5026 int err = -EINVAL; 5027 5028 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5029 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5030 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5031 return; 5032 epoch = ceph_decode_32(&p); 5033 maplen = ceph_decode_32(&p); 5034 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5035 5036 /* do we need it? */ 5037 mutex_lock(&mdsc->mutex); 5038 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5039 dout("handle_map epoch %u <= our %u\n", 5040 epoch, mdsc->mdsmap->m_epoch); 5041 mutex_unlock(&mdsc->mutex); 5042 return; 5043 } 5044 5045 newmap = ceph_mdsmap_decode(&p, end); 5046 if (IS_ERR(newmap)) { 5047 err = PTR_ERR(newmap); 5048 goto bad_unlock; 5049 } 5050 5051 /* swap into place */ 5052 if (mdsc->mdsmap) { 5053 oldmap = mdsc->mdsmap; 5054 mdsc->mdsmap = newmap; 5055 check_new_map(mdsc, newmap, oldmap); 5056 ceph_mdsmap_destroy(oldmap); 5057 } else { 5058 mdsc->mdsmap = newmap; /* first mds map */ 5059 } 5060 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5061 MAX_LFS_FILESIZE); 5062 5063 __wake_requests(mdsc, &mdsc->waiting_for_map); 5064 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5065 mdsc->mdsmap->m_epoch); 5066 5067 mutex_unlock(&mdsc->mutex); 5068 schedule_delayed(mdsc, 0); 5069 return; 5070 5071bad_unlock: 5072 mutex_unlock(&mdsc->mutex); 5073bad: 5074 pr_err("error decoding mdsmap %d\n", err); 5075 return; 5076} 5077 5078static struct ceph_connection *con_get(struct ceph_connection *con) 5079{ 5080 struct ceph_mds_session *s = con->private; 5081 5082 if (ceph_get_mds_session(s)) 5083 return con; 5084 return NULL; 5085} 5086 5087static void con_put(struct ceph_connection *con) 5088{ 5089 struct ceph_mds_session *s = con->private; 5090 5091 ceph_put_mds_session(s); 5092} 5093 5094/* 5095 * if the client is unresponsive for long enough, the mds will kill 5096 * the session entirely. 5097 */ 5098static void peer_reset(struct ceph_connection *con) 5099{ 5100 struct ceph_mds_session *s = con->private; 5101 struct ceph_mds_client *mdsc = s->s_mdsc; 5102 5103 pr_warn("mds%d closed our session\n", s->s_mds); 5104 send_mds_reconnect(mdsc, s); 5105} 5106 5107static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5108{ 5109 struct ceph_mds_session *s = con->private; 5110 struct ceph_mds_client *mdsc = s->s_mdsc; 5111 int type = le16_to_cpu(msg->hdr.type); 5112 5113 mutex_lock(&mdsc->mutex); 5114 if (__verify_registered_session(mdsc, s) < 0) { 5115 mutex_unlock(&mdsc->mutex); 5116 goto out; 5117 } 5118 mutex_unlock(&mdsc->mutex); 5119 5120 switch (type) { 5121 case CEPH_MSG_MDS_MAP: 5122 ceph_mdsc_handle_mdsmap(mdsc, msg); 5123 break; 5124 case CEPH_MSG_FS_MAP_USER: 5125 ceph_mdsc_handle_fsmap(mdsc, msg); 5126 break; 5127 case CEPH_MSG_CLIENT_SESSION: 5128 handle_session(s, msg); 5129 break; 5130 case CEPH_MSG_CLIENT_REPLY: 5131 handle_reply(s, msg); 5132 break; 5133 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5134 handle_forward(mdsc, s, msg); 5135 break; 5136 case CEPH_MSG_CLIENT_CAPS: 5137 ceph_handle_caps(s, msg); 5138 break; 5139 case CEPH_MSG_CLIENT_SNAP: 5140 ceph_handle_snap(mdsc, s, msg); 5141 break; 5142 case CEPH_MSG_CLIENT_LEASE: 5143 handle_lease(mdsc, s, msg); 5144 break; 5145 case CEPH_MSG_CLIENT_QUOTA: 5146 ceph_handle_quota(mdsc, s, msg); 5147 break; 5148 5149 default: 5150 pr_err("received unknown message type %d %s\n", type, 5151 ceph_msg_type_name(type)); 5152 } 5153out: 5154 ceph_msg_put(msg); 5155} 5156 5157/* 5158 * authentication 5159 */ 5160 5161/* 5162 * Note: returned pointer is the address of a structure that's 5163 * managed separately. Caller must *not* attempt to free it. 5164 */ 5165static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5166 int *proto, int force_new) 5167{ 5168 struct ceph_mds_session *s = con->private; 5169 struct ceph_mds_client *mdsc = s->s_mdsc; 5170 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5171 struct ceph_auth_handshake *auth = &s->s_auth; 5172 5173 if (force_new && auth->authorizer) { 5174 ceph_auth_destroy_authorizer(auth->authorizer); 5175 auth->authorizer = NULL; 5176 } 5177 if (!auth->authorizer) { 5178 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 5179 auth); 5180 if (ret) 5181 return ERR_PTR(ret); 5182 } else { 5183 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 5184 auth); 5185 if (ret) 5186 return ERR_PTR(ret); 5187 } 5188 *proto = ac->protocol; 5189 5190 return auth; 5191} 5192 5193static int add_authorizer_challenge(struct ceph_connection *con, 5194 void *challenge_buf, int challenge_buf_len) 5195{ 5196 struct ceph_mds_session *s = con->private; 5197 struct ceph_mds_client *mdsc = s->s_mdsc; 5198 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5199 5200 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5201 challenge_buf, challenge_buf_len); 5202} 5203 5204static int verify_authorizer_reply(struct ceph_connection *con) 5205{ 5206 struct ceph_mds_session *s = con->private; 5207 struct ceph_mds_client *mdsc = s->s_mdsc; 5208 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5209 5210 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 5211} 5212 5213static int invalidate_authorizer(struct ceph_connection *con) 5214{ 5215 struct ceph_mds_session *s = con->private; 5216 struct ceph_mds_client *mdsc = s->s_mdsc; 5217 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5218 5219 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5220 5221 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5222} 5223 5224static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5225 struct ceph_msg_header *hdr, int *skip) 5226{ 5227 struct ceph_msg *msg; 5228 int type = (int) le16_to_cpu(hdr->type); 5229 int front_len = (int) le32_to_cpu(hdr->front_len); 5230 5231 if (con->in_msg) 5232 return con->in_msg; 5233 5234 *skip = 0; 5235 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5236 if (!msg) { 5237 pr_err("unable to allocate msg type %d len %d\n", 5238 type, front_len); 5239 return NULL; 5240 } 5241 5242 return msg; 5243} 5244 5245static int mds_sign_message(struct ceph_msg *msg) 5246{ 5247 struct ceph_mds_session *s = msg->con->private; 5248 struct ceph_auth_handshake *auth = &s->s_auth; 5249 5250 return ceph_auth_sign_message(auth, msg); 5251} 5252 5253static int mds_check_message_signature(struct ceph_msg *msg) 5254{ 5255 struct ceph_mds_session *s = msg->con->private; 5256 struct ceph_auth_handshake *auth = &s->s_auth; 5257 5258 return ceph_auth_check_message_signature(auth, msg); 5259} 5260 5261static const struct ceph_connection_operations mds_con_ops = { 5262 .get = con_get, 5263 .put = con_put, 5264 .dispatch = dispatch, 5265 .get_authorizer = get_authorizer, 5266 .add_authorizer_challenge = add_authorizer_challenge, 5267 .verify_authorizer_reply = verify_authorizer_reply, 5268 .invalidate_authorizer = invalidate_authorizer, 5269 .peer_reset = peer_reset, 5270 .alloc_msg = mds_alloc_msg, 5271 .sign_message = mds_sign_message, 5272 .check_message_signature = mds_check_message_signature, 5273}; 5274 5275/* eof */ 5276