1// SPDX-License-Identifier: GPL-2.0-or-later 2/* 3 * Copyright (C) 2015, SUSE 4 */ 5 6 7#include <linux/module.h> 8#include <linux/kthread.h> 9#include <linux/dlm.h> 10#include <linux/sched.h> 11#include <linux/raid/md_p.h> 12#include "md.h" 13#include "md-bitmap.h" 14#include "md-cluster.h" 15 16#define LVB_SIZE 64 17#define NEW_DEV_TIMEOUT 5000 18 19struct dlm_lock_resource { 20 dlm_lockspace_t *ls; 21 struct dlm_lksb lksb; 22 char *name; /* lock name. */ 23 uint32_t flags; /* flags to pass to dlm_lock() */ 24 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */ 25 bool sync_locking_done; 26 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 27 struct mddev *mddev; /* pointing back to mddev. */ 28 int mode; 29}; 30 31struct resync_info { 32 __le64 lo; 33 __le64 hi; 34}; 35 36/* md_cluster_info flags */ 37#define MD_CLUSTER_WAITING_FOR_NEWDISK 1 38#define MD_CLUSTER_SUSPEND_READ_BALANCING 2 39#define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 40 41/* Lock the send communication. This is done through 42 * bit manipulation as opposed to a mutex in order to 43 * accomodate lock and hold. See next comment. 44 */ 45#define MD_CLUSTER_SEND_LOCK 4 46/* If cluster operations (such as adding a disk) must lock the 47 * communication channel, so as to perform extra operations 48 * (update metadata) and no other operation is allowed on the 49 * MD. Token needs to be locked and held until the operation 50 * completes witha md_update_sb(), which would eventually release 51 * the lock. 52 */ 53#define MD_CLUSTER_SEND_LOCKED_ALREADY 5 54/* We should receive message after node joined cluster and 55 * set up all the related infos such as bitmap and personality */ 56#define MD_CLUSTER_ALREADY_IN_CLUSTER 6 57#define MD_CLUSTER_PENDING_RECV_EVENT 7 58#define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8 59 60struct md_cluster_info { 61 struct mddev *mddev; /* the md device which md_cluster_info belongs to */ 62 /* dlm lock space and resources for clustered raid. */ 63 dlm_lockspace_t *lockspace; 64 int slot_number; 65 struct completion completion; 66 struct mutex recv_mutex; 67 struct dlm_lock_resource *bitmap_lockres; 68 struct dlm_lock_resource **other_bitmap_lockres; 69 struct dlm_lock_resource *resync_lockres; 70 struct list_head suspend_list; 71 72 spinlock_t suspend_lock; 73 /* record the region which write should be suspended */ 74 sector_t suspend_lo; 75 sector_t suspend_hi; 76 int suspend_from; /* the slot which broadcast suspend_lo/hi */ 77 78 struct md_thread *recovery_thread; 79 unsigned long recovery_map; 80 /* communication loc resources */ 81 struct dlm_lock_resource *ack_lockres; 82 struct dlm_lock_resource *message_lockres; 83 struct dlm_lock_resource *token_lockres; 84 struct dlm_lock_resource *no_new_dev_lockres; 85 struct md_thread *recv_thread; 86 struct completion newdisk_completion; 87 wait_queue_head_t wait; 88 unsigned long state; 89 /* record the region in RESYNCING message */ 90 sector_t sync_low; 91 sector_t sync_hi; 92}; 93 94enum msg_type { 95 METADATA_UPDATED = 0, 96 RESYNCING, 97 NEWDISK, 98 REMOVE, 99 RE_ADD, 100 BITMAP_NEEDS_SYNC, 101 CHANGE_CAPACITY, 102 BITMAP_RESIZE, 103}; 104 105struct cluster_msg { 106 __le32 type; 107 __le32 slot; 108 /* TODO: Unionize this for smaller footprint */ 109 __le64 low; 110 __le64 high; 111 char uuid[16]; 112 __le32 raid_slot; 113}; 114 115static void sync_ast(void *arg) 116{ 117 struct dlm_lock_resource *res; 118 119 res = arg; 120 res->sync_locking_done = true; 121 wake_up(&res->sync_locking); 122} 123 124static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 125{ 126 int ret = 0; 127 128 ret = dlm_lock(res->ls, mode, &res->lksb, 129 res->flags, res->name, strlen(res->name), 130 0, sync_ast, res, res->bast); 131 if (ret) 132 return ret; 133 wait_event(res->sync_locking, res->sync_locking_done); 134 res->sync_locking_done = false; 135 if (res->lksb.sb_status == 0) 136 res->mode = mode; 137 return res->lksb.sb_status; 138} 139 140static int dlm_unlock_sync(struct dlm_lock_resource *res) 141{ 142 return dlm_lock_sync(res, DLM_LOCK_NL); 143} 144 145/* 146 * An variation of dlm_lock_sync, which make lock request could 147 * be interrupted 148 */ 149static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode, 150 struct mddev *mddev) 151{ 152 int ret = 0; 153 154 ret = dlm_lock(res->ls, mode, &res->lksb, 155 res->flags, res->name, strlen(res->name), 156 0, sync_ast, res, res->bast); 157 if (ret) 158 return ret; 159 160 wait_event(res->sync_locking, res->sync_locking_done 161 || kthread_should_stop() 162 || test_bit(MD_CLOSING, &mddev->flags)); 163 if (!res->sync_locking_done) { 164 /* 165 * the convert queue contains the lock request when request is 166 * interrupted, and sync_ast could still be run, so need to 167 * cancel the request and reset completion 168 */ 169 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL, 170 &res->lksb, res); 171 res->sync_locking_done = false; 172 if (unlikely(ret != 0)) 173 pr_info("failed to cancel previous lock request " 174 "%s return %d\n", res->name, ret); 175 return -EPERM; 176 } else 177 res->sync_locking_done = false; 178 if (res->lksb.sb_status == 0) 179 res->mode = mode; 180 return res->lksb.sb_status; 181} 182 183static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 184 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 185{ 186 struct dlm_lock_resource *res = NULL; 187 int ret, namelen; 188 struct md_cluster_info *cinfo = mddev->cluster_info; 189 190 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 191 if (!res) 192 return NULL; 193 init_waitqueue_head(&res->sync_locking); 194 res->sync_locking_done = false; 195 res->ls = cinfo->lockspace; 196 res->mddev = mddev; 197 res->mode = DLM_LOCK_IV; 198 namelen = strlen(name); 199 res->name = kzalloc(namelen + 1, GFP_KERNEL); 200 if (!res->name) { 201 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 202 goto out_err; 203 } 204 strlcpy(res->name, name, namelen + 1); 205 if (with_lvb) { 206 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 207 if (!res->lksb.sb_lvbptr) { 208 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 209 goto out_err; 210 } 211 res->flags = DLM_LKF_VALBLK; 212 } 213 214 if (bastfn) 215 res->bast = bastfn; 216 217 res->flags |= DLM_LKF_EXPEDITE; 218 219 ret = dlm_lock_sync(res, DLM_LOCK_NL); 220 if (ret) { 221 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 222 goto out_err; 223 } 224 res->flags &= ~DLM_LKF_EXPEDITE; 225 res->flags |= DLM_LKF_CONVERT; 226 227 return res; 228out_err: 229 kfree(res->lksb.sb_lvbptr); 230 kfree(res->name); 231 kfree(res); 232 return NULL; 233} 234 235static void lockres_free(struct dlm_lock_resource *res) 236{ 237 int ret = 0; 238 239 if (!res) 240 return; 241 242 /* 243 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 244 * waiting or convert queue 245 */ 246 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 247 &res->lksb, res); 248 if (unlikely(ret != 0)) 249 pr_err("failed to unlock %s return %d\n", res->name, ret); 250 else 251 wait_event(res->sync_locking, res->sync_locking_done); 252 253 kfree(res->name); 254 kfree(res->lksb.sb_lvbptr); 255 kfree(res); 256} 257 258static void add_resync_info(struct dlm_lock_resource *lockres, 259 sector_t lo, sector_t hi) 260{ 261 struct resync_info *ri; 262 263 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 264 ri->lo = cpu_to_le64(lo); 265 ri->hi = cpu_to_le64(hi); 266} 267 268static int read_resync_info(struct mddev *mddev, 269 struct dlm_lock_resource *lockres) 270{ 271 struct resync_info ri; 272 struct md_cluster_info *cinfo = mddev->cluster_info; 273 int ret = 0; 274 275 dlm_lock_sync(lockres, DLM_LOCK_CR); 276 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 277 if (le64_to_cpu(ri.hi) > 0) { 278 cinfo->suspend_hi = le64_to_cpu(ri.hi); 279 cinfo->suspend_lo = le64_to_cpu(ri.lo); 280 ret = 1; 281 } 282 dlm_unlock_sync(lockres); 283 return ret; 284} 285 286static void recover_bitmaps(struct md_thread *thread) 287{ 288 struct mddev *mddev = thread->mddev; 289 struct md_cluster_info *cinfo = mddev->cluster_info; 290 struct dlm_lock_resource *bm_lockres; 291 char str[64]; 292 int slot, ret; 293 sector_t lo, hi; 294 295 while (cinfo->recovery_map) { 296 slot = fls64((u64)cinfo->recovery_map) - 1; 297 298 snprintf(str, 64, "bitmap%04d", slot); 299 bm_lockres = lockres_init(mddev, str, NULL, 1); 300 if (!bm_lockres) { 301 pr_err("md-cluster: Cannot initialize bitmaps\n"); 302 goto clear_bit; 303 } 304 305 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev); 306 if (ret) { 307 pr_err("md-cluster: Could not DLM lock %s: %d\n", 308 str, ret); 309 goto clear_bit; 310 } 311 ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 312 if (ret) { 313 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 314 goto clear_bit; 315 } 316 317 /* Clear suspend_area associated with the bitmap */ 318 spin_lock_irq(&cinfo->suspend_lock); 319 cinfo->suspend_hi = 0; 320 cinfo->suspend_lo = 0; 321 cinfo->suspend_from = -1; 322 spin_unlock_irq(&cinfo->suspend_lock); 323 324 /* Kick off a reshape if needed */ 325 if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) && 326 test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) && 327 mddev->reshape_position != MaxSector) 328 md_wakeup_thread(mddev->sync_thread); 329 330 if (hi > 0) { 331 if (lo < mddev->recovery_cp) 332 mddev->recovery_cp = lo; 333 /* wake up thread to continue resync in case resync 334 * is not finished */ 335 if (mddev->recovery_cp != MaxSector) { 336 /* 337 * clear the REMOTE flag since we will launch 338 * resync thread in current node. 339 */ 340 clear_bit(MD_RESYNCING_REMOTE, 341 &mddev->recovery); 342 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 343 md_wakeup_thread(mddev->thread); 344 } 345 } 346clear_bit: 347 lockres_free(bm_lockres); 348 clear_bit(slot, &cinfo->recovery_map); 349 } 350} 351 352static void recover_prep(void *arg) 353{ 354 struct mddev *mddev = arg; 355 struct md_cluster_info *cinfo = mddev->cluster_info; 356 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 357} 358 359static void __recover_slot(struct mddev *mddev, int slot) 360{ 361 struct md_cluster_info *cinfo = mddev->cluster_info; 362 363 set_bit(slot, &cinfo->recovery_map); 364 if (!cinfo->recovery_thread) { 365 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 366 mddev, "recover"); 367 if (!cinfo->recovery_thread) { 368 pr_warn("md-cluster: Could not create recovery thread\n"); 369 return; 370 } 371 } 372 md_wakeup_thread(cinfo->recovery_thread); 373} 374 375static void recover_slot(void *arg, struct dlm_slot *slot) 376{ 377 struct mddev *mddev = arg; 378 struct md_cluster_info *cinfo = mddev->cluster_info; 379 380 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 381 mddev->bitmap_info.cluster_name, 382 slot->nodeid, slot->slot, 383 cinfo->slot_number); 384 /* deduct one since dlm slot starts from one while the num of 385 * cluster-md begins with 0 */ 386 __recover_slot(mddev, slot->slot - 1); 387} 388 389static void recover_done(void *arg, struct dlm_slot *slots, 390 int num_slots, int our_slot, 391 uint32_t generation) 392{ 393 struct mddev *mddev = arg; 394 struct md_cluster_info *cinfo = mddev->cluster_info; 395 396 cinfo->slot_number = our_slot; 397 /* completion is only need to be complete when node join cluster, 398 * it doesn't need to run during another node's failure */ 399 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 400 complete(&cinfo->completion); 401 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 402 } 403 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 404} 405 406/* the ops is called when node join the cluster, and do lock recovery 407 * if node failure occurs */ 408static const struct dlm_lockspace_ops md_ls_ops = { 409 .recover_prep = recover_prep, 410 .recover_slot = recover_slot, 411 .recover_done = recover_done, 412}; 413 414/* 415 * The BAST function for the ack lock resource 416 * This function wakes up the receive thread in 417 * order to receive and process the message. 418 */ 419static void ack_bast(void *arg, int mode) 420{ 421 struct dlm_lock_resource *res = arg; 422 struct md_cluster_info *cinfo = res->mddev->cluster_info; 423 424 if (mode == DLM_LOCK_EX) { 425 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 426 md_wakeup_thread(cinfo->recv_thread); 427 else 428 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 429 } 430} 431 432static void remove_suspend_info(struct mddev *mddev, int slot) 433{ 434 struct md_cluster_info *cinfo = mddev->cluster_info; 435 mddev->pers->quiesce(mddev, 1); 436 spin_lock_irq(&cinfo->suspend_lock); 437 cinfo->suspend_hi = 0; 438 cinfo->suspend_lo = 0; 439 spin_unlock_irq(&cinfo->suspend_lock); 440 mddev->pers->quiesce(mddev, 0); 441} 442 443static void process_suspend_info(struct mddev *mddev, 444 int slot, sector_t lo, sector_t hi) 445{ 446 struct md_cluster_info *cinfo = mddev->cluster_info; 447 struct mdp_superblock_1 *sb = NULL; 448 struct md_rdev *rdev; 449 450 if (!hi) { 451 /* 452 * clear the REMOTE flag since resync or recovery is finished 453 * in remote node. 454 */ 455 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 456 remove_suspend_info(mddev, slot); 457 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 458 md_wakeup_thread(mddev->thread); 459 return; 460 } 461 462 rdev_for_each(rdev, mddev) 463 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 464 sb = page_address(rdev->sb_page); 465 break; 466 } 467 468 /* 469 * The bitmaps are not same for different nodes 470 * if RESYNCING is happening in one node, then 471 * the node which received the RESYNCING message 472 * probably will perform resync with the region 473 * [lo, hi] again, so we could reduce resync time 474 * a lot if we can ensure that the bitmaps among 475 * different nodes are match up well. 476 * 477 * sync_low/hi is used to record the region which 478 * arrived in the previous RESYNCING message, 479 * 480 * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK 481 * and set RESYNC_MASK since resync thread is running 482 * in another node, so we don't need to do the resync 483 * again with the same section. 484 * 485 * Skip md_bitmap_sync_with_cluster in case reshape 486 * happening, because reshaping region is small and 487 * we don't want to trigger lots of WARN. 488 */ 489 if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE)) 490 md_bitmap_sync_with_cluster(mddev, cinfo->sync_low, 491 cinfo->sync_hi, lo, hi); 492 cinfo->sync_low = lo; 493 cinfo->sync_hi = hi; 494 495 mddev->pers->quiesce(mddev, 1); 496 spin_lock_irq(&cinfo->suspend_lock); 497 cinfo->suspend_from = slot; 498 cinfo->suspend_lo = lo; 499 cinfo->suspend_hi = hi; 500 spin_unlock_irq(&cinfo->suspend_lock); 501 mddev->pers->quiesce(mddev, 0); 502} 503 504static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 505{ 506 char disk_uuid[64]; 507 struct md_cluster_info *cinfo = mddev->cluster_info; 508 char event_name[] = "EVENT=ADD_DEVICE"; 509 char raid_slot[16]; 510 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 511 int len; 512 513 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 514 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 515 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 516 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 517 init_completion(&cinfo->newdisk_completion); 518 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 519 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 520 wait_for_completion_timeout(&cinfo->newdisk_completion, 521 NEW_DEV_TIMEOUT); 522 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 523} 524 525 526static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 527{ 528 int got_lock = 0; 529 struct md_cluster_info *cinfo = mddev->cluster_info; 530 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 531 532 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 533 wait_event(mddev->thread->wqueue, 534 (got_lock = mddev_trylock(mddev)) || 535 test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state)); 536 md_reload_sb(mddev, mddev->good_device_nr); 537 if (got_lock) 538 mddev_unlock(mddev); 539} 540 541static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 542{ 543 struct md_rdev *rdev; 544 545 rcu_read_lock(); 546 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 547 if (rdev) { 548 set_bit(ClusterRemove, &rdev->flags); 549 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 550 md_wakeup_thread(mddev->thread); 551 } 552 else 553 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 554 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 555 rcu_read_unlock(); 556} 557 558static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 559{ 560 struct md_rdev *rdev; 561 562 rcu_read_lock(); 563 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 564 if (rdev && test_bit(Faulty, &rdev->flags)) 565 clear_bit(Faulty, &rdev->flags); 566 else 567 pr_warn("%s: %d Could not find disk(%d) which is faulty", 568 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 569 rcu_read_unlock(); 570} 571 572static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 573{ 574 int ret = 0; 575 576 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 577 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 578 return -1; 579 switch (le32_to_cpu(msg->type)) { 580 case METADATA_UPDATED: 581 process_metadata_update(mddev, msg); 582 break; 583 case CHANGE_CAPACITY: 584 set_capacity(mddev->gendisk, mddev->array_sectors); 585 revalidate_disk_size(mddev->gendisk, true); 586 break; 587 case RESYNCING: 588 set_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 589 process_suspend_info(mddev, le32_to_cpu(msg->slot), 590 le64_to_cpu(msg->low), 591 le64_to_cpu(msg->high)); 592 break; 593 case NEWDISK: 594 process_add_new_disk(mddev, msg); 595 break; 596 case REMOVE: 597 process_remove_disk(mddev, msg); 598 break; 599 case RE_ADD: 600 process_readd_disk(mddev, msg); 601 break; 602 case BITMAP_NEEDS_SYNC: 603 __recover_slot(mddev, le32_to_cpu(msg->slot)); 604 break; 605 case BITMAP_RESIZE: 606 if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0)) 607 ret = md_bitmap_resize(mddev->bitmap, 608 le64_to_cpu(msg->high), 0, 0); 609 break; 610 default: 611 ret = -1; 612 pr_warn("%s:%d Received unknown message from %d\n", 613 __func__, __LINE__, msg->slot); 614 } 615 return ret; 616} 617 618/* 619 * thread for receiving message 620 */ 621static void recv_daemon(struct md_thread *thread) 622{ 623 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 624 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 625 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 626 struct cluster_msg msg; 627 int ret; 628 629 mutex_lock(&cinfo->recv_mutex); 630 /*get CR on Message*/ 631 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 632 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 633 mutex_unlock(&cinfo->recv_mutex); 634 return; 635 } 636 637 /* read lvb and wake up thread to process this message_lockres */ 638 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 639 ret = process_recvd_msg(thread->mddev, &msg); 640 if (ret) 641 goto out; 642 643 /*release CR on ack_lockres*/ 644 ret = dlm_unlock_sync(ack_lockres); 645 if (unlikely(ret != 0)) 646 pr_info("unlock ack failed return %d\n", ret); 647 /*up-convert to PR on message_lockres*/ 648 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 649 if (unlikely(ret != 0)) 650 pr_info("lock PR on msg failed return %d\n", ret); 651 /*get CR on ack_lockres again*/ 652 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 653 if (unlikely(ret != 0)) 654 pr_info("lock CR on ack failed return %d\n", ret); 655out: 656 /*release CR on message_lockres*/ 657 ret = dlm_unlock_sync(message_lockres); 658 if (unlikely(ret != 0)) 659 pr_info("unlock msg failed return %d\n", ret); 660 mutex_unlock(&cinfo->recv_mutex); 661} 662 663/* lock_token() 664 * Takes the lock on the TOKEN lock resource so no other 665 * node can communicate while the operation is underway. 666 */ 667static int lock_token(struct md_cluster_info *cinfo) 668{ 669 int error; 670 671 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 672 if (error) { 673 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 674 __func__, __LINE__, error); 675 } else { 676 /* Lock the receive sequence */ 677 mutex_lock(&cinfo->recv_mutex); 678 } 679 return error; 680} 681 682/* lock_comm() 683 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 684 */ 685static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked) 686{ 687 int rv, set_bit = 0; 688 struct mddev *mddev = cinfo->mddev; 689 690 /* 691 * If resync thread run after raid1d thread, then process_metadata_update 692 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked 693 * since another node already got EX on Token and waitting the EX of Ack), 694 * so let resync wake up thread in case flag is set. 695 */ 696 if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 697 &cinfo->state)) { 698 rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 699 &cinfo->state); 700 WARN_ON_ONCE(rv); 701 md_wakeup_thread(mddev->thread); 702 set_bit = 1; 703 } 704 705 wait_event(cinfo->wait, 706 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 707 rv = lock_token(cinfo); 708 if (set_bit) 709 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 710 return rv; 711} 712 713static void unlock_comm(struct md_cluster_info *cinfo) 714{ 715 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 716 mutex_unlock(&cinfo->recv_mutex); 717 dlm_unlock_sync(cinfo->token_lockres); 718 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 719 wake_up(&cinfo->wait); 720} 721 722/* __sendmsg() 723 * This function performs the actual sending of the message. This function is 724 * usually called after performing the encompassing operation 725 * The function: 726 * 1. Grabs the message lockresource in EX mode 727 * 2. Copies the message to the message LVB 728 * 3. Downconverts message lockresource to CW 729 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 730 * and the other nodes read the message. The thread will wait here until all other 731 * nodes have released ack lock resource. 732 * 5. Downconvert ack lockresource to CR 733 */ 734static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 735{ 736 int error; 737 int slot = cinfo->slot_number - 1; 738 739 cmsg->slot = cpu_to_le32(slot); 740 /*get EX on Message*/ 741 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 742 if (error) { 743 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 744 goto failed_message; 745 } 746 747 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 748 sizeof(struct cluster_msg)); 749 /*down-convert EX to CW on Message*/ 750 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 751 if (error) { 752 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 753 error); 754 goto failed_ack; 755 } 756 757 /*up-convert CR to EX on Ack*/ 758 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 759 if (error) { 760 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 761 error); 762 goto failed_ack; 763 } 764 765 /*down-convert EX to CR on Ack*/ 766 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 767 if (error) { 768 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 769 error); 770 goto failed_ack; 771 } 772 773failed_ack: 774 error = dlm_unlock_sync(cinfo->message_lockres); 775 if (unlikely(error != 0)) { 776 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 777 error); 778 /* in case the message can't be released due to some reason */ 779 goto failed_ack; 780 } 781failed_message: 782 return error; 783} 784 785static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg, 786 bool mddev_locked) 787{ 788 int ret; 789 790 ret = lock_comm(cinfo, mddev_locked); 791 if (!ret) { 792 ret = __sendmsg(cinfo, cmsg); 793 unlock_comm(cinfo); 794 } 795 return ret; 796} 797 798static int gather_all_resync_info(struct mddev *mddev, int total_slots) 799{ 800 struct md_cluster_info *cinfo = mddev->cluster_info; 801 int i, ret = 0; 802 struct dlm_lock_resource *bm_lockres; 803 char str[64]; 804 sector_t lo, hi; 805 806 807 for (i = 0; i < total_slots; i++) { 808 memset(str, '\0', 64); 809 snprintf(str, 64, "bitmap%04d", i); 810 bm_lockres = lockres_init(mddev, str, NULL, 1); 811 if (!bm_lockres) 812 return -ENOMEM; 813 if (i == (cinfo->slot_number - 1)) { 814 lockres_free(bm_lockres); 815 continue; 816 } 817 818 bm_lockres->flags |= DLM_LKF_NOQUEUE; 819 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 820 if (ret == -EAGAIN) { 821 if (read_resync_info(mddev, bm_lockres)) { 822 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 823 __func__, __LINE__, 824 (unsigned long long) cinfo->suspend_lo, 825 (unsigned long long) cinfo->suspend_hi, 826 i); 827 cinfo->suspend_from = i; 828 } 829 ret = 0; 830 lockres_free(bm_lockres); 831 continue; 832 } 833 if (ret) { 834 lockres_free(bm_lockres); 835 goto out; 836 } 837 838 /* Read the disk bitmap sb and check if it needs recovery */ 839 ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 840 if (ret) { 841 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 842 lockres_free(bm_lockres); 843 continue; 844 } 845 if ((hi > 0) && (lo < mddev->recovery_cp)) { 846 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 847 mddev->recovery_cp = lo; 848 md_check_recovery(mddev); 849 } 850 851 lockres_free(bm_lockres); 852 } 853out: 854 return ret; 855} 856 857static int join(struct mddev *mddev, int nodes) 858{ 859 struct md_cluster_info *cinfo; 860 int ret, ops_rv; 861 char str[64]; 862 863 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 864 if (!cinfo) 865 return -ENOMEM; 866 867 INIT_LIST_HEAD(&cinfo->suspend_list); 868 spin_lock_init(&cinfo->suspend_lock); 869 init_completion(&cinfo->completion); 870 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 871 init_waitqueue_head(&cinfo->wait); 872 mutex_init(&cinfo->recv_mutex); 873 874 mddev->cluster_info = cinfo; 875 cinfo->mddev = mddev; 876 877 memset(str, 0, 64); 878 sprintf(str, "%pU", mddev->uuid); 879 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 880 DLM_LSFL_FS, LVB_SIZE, 881 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 882 if (ret) 883 goto err; 884 wait_for_completion(&cinfo->completion); 885 if (nodes < cinfo->slot_number) { 886 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 887 cinfo->slot_number, nodes); 888 ret = -ERANGE; 889 goto err; 890 } 891 /* Initiate the communication resources */ 892 ret = -ENOMEM; 893 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 894 if (!cinfo->recv_thread) { 895 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 896 goto err; 897 } 898 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 899 if (!cinfo->message_lockres) 900 goto err; 901 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 902 if (!cinfo->token_lockres) 903 goto err; 904 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 905 if (!cinfo->no_new_dev_lockres) 906 goto err; 907 908 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 909 if (ret) { 910 ret = -EAGAIN; 911 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 912 goto err; 913 } 914 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 915 if (!cinfo->ack_lockres) { 916 ret = -ENOMEM; 917 goto err; 918 } 919 /* get sync CR lock on ACK. */ 920 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 921 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 922 ret); 923 dlm_unlock_sync(cinfo->token_lockres); 924 /* get sync CR lock on no-new-dev. */ 925 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 926 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 927 928 929 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 930 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 931 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 932 if (!cinfo->bitmap_lockres) { 933 ret = -ENOMEM; 934 goto err; 935 } 936 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 937 pr_err("Failed to get bitmap lock\n"); 938 ret = -EINVAL; 939 goto err; 940 } 941 942 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 943 if (!cinfo->resync_lockres) { 944 ret = -ENOMEM; 945 goto err; 946 } 947 948 return 0; 949err: 950 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 951 md_unregister_thread(&cinfo->recovery_thread); 952 md_unregister_thread(&cinfo->recv_thread); 953 lockres_free(cinfo->message_lockres); 954 lockres_free(cinfo->token_lockres); 955 lockres_free(cinfo->ack_lockres); 956 lockres_free(cinfo->no_new_dev_lockres); 957 lockres_free(cinfo->resync_lockres); 958 lockres_free(cinfo->bitmap_lockres); 959 if (cinfo->lockspace) 960 dlm_release_lockspace(cinfo->lockspace, 2); 961 mddev->cluster_info = NULL; 962 kfree(cinfo); 963 return ret; 964} 965 966static void load_bitmaps(struct mddev *mddev, int total_slots) 967{ 968 struct md_cluster_info *cinfo = mddev->cluster_info; 969 970 /* load all the node's bitmap info for resync */ 971 if (gather_all_resync_info(mddev, total_slots)) 972 pr_err("md-cluster: failed to gather all resyn infos\n"); 973 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 974 /* wake up recv thread in case something need to be handled */ 975 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 976 md_wakeup_thread(cinfo->recv_thread); 977} 978 979static void resync_bitmap(struct mddev *mddev) 980{ 981 struct md_cluster_info *cinfo = mddev->cluster_info; 982 struct cluster_msg cmsg = {0}; 983 int err; 984 985 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 986 err = sendmsg(cinfo, &cmsg, 1); 987 if (err) 988 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 989 __func__, __LINE__, err); 990} 991 992static void unlock_all_bitmaps(struct mddev *mddev); 993static int leave(struct mddev *mddev) 994{ 995 struct md_cluster_info *cinfo = mddev->cluster_info; 996 997 if (!cinfo) 998 return 0; 999 1000 /* 1001 * BITMAP_NEEDS_SYNC message should be sent when node 1002 * is leaving the cluster with dirty bitmap, also we 1003 * can only deliver it when dlm connection is available. 1004 * 1005 * Also, we should send BITMAP_NEEDS_SYNC message in 1006 * case reshaping is interrupted. 1007 */ 1008 if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) || 1009 (mddev->reshape_position != MaxSector && 1010 test_bit(MD_CLOSING, &mddev->flags))) 1011 resync_bitmap(mddev); 1012 1013 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1014 md_unregister_thread(&cinfo->recovery_thread); 1015 md_unregister_thread(&cinfo->recv_thread); 1016 lockres_free(cinfo->message_lockres); 1017 lockres_free(cinfo->token_lockres); 1018 lockres_free(cinfo->ack_lockres); 1019 lockres_free(cinfo->no_new_dev_lockres); 1020 lockres_free(cinfo->resync_lockres); 1021 lockres_free(cinfo->bitmap_lockres); 1022 unlock_all_bitmaps(mddev); 1023 dlm_release_lockspace(cinfo->lockspace, 2); 1024 kfree(cinfo); 1025 return 0; 1026} 1027 1028/* slot_number(): Returns the MD slot number to use 1029 * DLM starts the slot numbers from 1, wheras cluster-md 1030 * wants the number to be from zero, so we deduct one 1031 */ 1032static int slot_number(struct mddev *mddev) 1033{ 1034 struct md_cluster_info *cinfo = mddev->cluster_info; 1035 1036 return cinfo->slot_number - 1; 1037} 1038 1039/* 1040 * Check if the communication is already locked, else lock the communication 1041 * channel. 1042 * If it is already locked, token is in EX mode, and hence lock_token() 1043 * should not be called. 1044 */ 1045static int metadata_update_start(struct mddev *mddev) 1046{ 1047 struct md_cluster_info *cinfo = mddev->cluster_info; 1048 int ret; 1049 1050 /* 1051 * metadata_update_start is always called with the protection of 1052 * reconfig_mutex, so set WAITING_FOR_TOKEN here. 1053 */ 1054 ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 1055 &cinfo->state); 1056 WARN_ON_ONCE(ret); 1057 md_wakeup_thread(mddev->thread); 1058 1059 wait_event(cinfo->wait, 1060 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 1061 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 1062 1063 /* If token is already locked, return 0 */ 1064 if (cinfo->token_lockres->mode == DLM_LOCK_EX) { 1065 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1066 return 0; 1067 } 1068 1069 ret = lock_token(cinfo); 1070 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1071 return ret; 1072} 1073 1074static int metadata_update_finish(struct mddev *mddev) 1075{ 1076 struct md_cluster_info *cinfo = mddev->cluster_info; 1077 struct cluster_msg cmsg; 1078 struct md_rdev *rdev; 1079 int ret = 0; 1080 int raid_slot = -1; 1081 1082 memset(&cmsg, 0, sizeof(cmsg)); 1083 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1084 /* Pick up a good active device number to send. 1085 */ 1086 rdev_for_each(rdev, mddev) 1087 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 1088 raid_slot = rdev->desc_nr; 1089 break; 1090 } 1091 if (raid_slot >= 0) { 1092 cmsg.raid_slot = cpu_to_le32(raid_slot); 1093 ret = __sendmsg(cinfo, &cmsg); 1094 } else 1095 pr_warn("md-cluster: No good device id found to send\n"); 1096 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1097 unlock_comm(cinfo); 1098 return ret; 1099} 1100 1101static void metadata_update_cancel(struct mddev *mddev) 1102{ 1103 struct md_cluster_info *cinfo = mddev->cluster_info; 1104 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1105 unlock_comm(cinfo); 1106} 1107 1108static int update_bitmap_size(struct mddev *mddev, sector_t size) 1109{ 1110 struct md_cluster_info *cinfo = mddev->cluster_info; 1111 struct cluster_msg cmsg = {0}; 1112 int ret; 1113 1114 cmsg.type = cpu_to_le32(BITMAP_RESIZE); 1115 cmsg.high = cpu_to_le64(size); 1116 ret = sendmsg(cinfo, &cmsg, 0); 1117 if (ret) 1118 pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n", 1119 __func__, __LINE__, ret); 1120 return ret; 1121} 1122 1123static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize) 1124{ 1125 struct bitmap_counts *counts; 1126 char str[64]; 1127 struct dlm_lock_resource *bm_lockres; 1128 struct bitmap *bitmap = mddev->bitmap; 1129 unsigned long my_pages = bitmap->counts.pages; 1130 int i, rv; 1131 1132 /* 1133 * We need to ensure all the nodes can grow to a larger 1134 * bitmap size before make the reshaping. 1135 */ 1136 rv = update_bitmap_size(mddev, newsize); 1137 if (rv) 1138 return rv; 1139 1140 for (i = 0; i < mddev->bitmap_info.nodes; i++) { 1141 if (i == md_cluster_ops->slot_number(mddev)) 1142 continue; 1143 1144 bitmap = get_bitmap_from_slot(mddev, i); 1145 if (IS_ERR(bitmap)) { 1146 pr_err("can't get bitmap from slot %d\n", i); 1147 bitmap = NULL; 1148 goto out; 1149 } 1150 counts = &bitmap->counts; 1151 1152 /* 1153 * If we can hold the bitmap lock of one node then 1154 * the slot is not occupied, update the pages. 1155 */ 1156 snprintf(str, 64, "bitmap%04d", i); 1157 bm_lockres = lockres_init(mddev, str, NULL, 1); 1158 if (!bm_lockres) { 1159 pr_err("Cannot initialize %s lock\n", str); 1160 goto out; 1161 } 1162 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1163 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1164 if (!rv) 1165 counts->pages = my_pages; 1166 lockres_free(bm_lockres); 1167 1168 if (my_pages != counts->pages) 1169 /* 1170 * Let's revert the bitmap size if one node 1171 * can't resize bitmap 1172 */ 1173 goto out; 1174 md_bitmap_free(bitmap); 1175 } 1176 1177 return 0; 1178out: 1179 md_bitmap_free(bitmap); 1180 update_bitmap_size(mddev, oldsize); 1181 return -1; 1182} 1183 1184/* 1185 * return 0 if all the bitmaps have the same sync_size 1186 */ 1187static int cluster_check_sync_size(struct mddev *mddev) 1188{ 1189 int i, rv; 1190 bitmap_super_t *sb; 1191 unsigned long my_sync_size, sync_size = 0; 1192 int node_num = mddev->bitmap_info.nodes; 1193 int current_slot = md_cluster_ops->slot_number(mddev); 1194 struct bitmap *bitmap = mddev->bitmap; 1195 char str[64]; 1196 struct dlm_lock_resource *bm_lockres; 1197 1198 sb = kmap_atomic(bitmap->storage.sb_page); 1199 my_sync_size = sb->sync_size; 1200 kunmap_atomic(sb); 1201 1202 for (i = 0; i < node_num; i++) { 1203 if (i == current_slot) 1204 continue; 1205 1206 bitmap = get_bitmap_from_slot(mddev, i); 1207 if (IS_ERR(bitmap)) { 1208 pr_err("can't get bitmap from slot %d\n", i); 1209 return -1; 1210 } 1211 1212 /* 1213 * If we can hold the bitmap lock of one node then 1214 * the slot is not occupied, update the sb. 1215 */ 1216 snprintf(str, 64, "bitmap%04d", i); 1217 bm_lockres = lockres_init(mddev, str, NULL, 1); 1218 if (!bm_lockres) { 1219 pr_err("md-cluster: Cannot initialize %s\n", str); 1220 md_bitmap_free(bitmap); 1221 return -1; 1222 } 1223 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1224 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1225 if (!rv) 1226 md_bitmap_update_sb(bitmap); 1227 lockres_free(bm_lockres); 1228 1229 sb = kmap_atomic(bitmap->storage.sb_page); 1230 if (sync_size == 0) 1231 sync_size = sb->sync_size; 1232 else if (sync_size != sb->sync_size) { 1233 kunmap_atomic(sb); 1234 md_bitmap_free(bitmap); 1235 return -1; 1236 } 1237 kunmap_atomic(sb); 1238 md_bitmap_free(bitmap); 1239 } 1240 1241 return (my_sync_size == sync_size) ? 0 : -1; 1242} 1243 1244/* 1245 * Update the size for cluster raid is a little more complex, we perform it 1246 * by the steps: 1247 * 1. hold token lock and update superblock in initiator node. 1248 * 2. send METADATA_UPDATED msg to other nodes. 1249 * 3. The initiator node continues to check each bitmap's sync_size, if all 1250 * bitmaps have the same value of sync_size, then we can set capacity and 1251 * let other nodes to perform it. If one node can't update sync_size 1252 * accordingly, we need to revert to previous value. 1253 */ 1254static void update_size(struct mddev *mddev, sector_t old_dev_sectors) 1255{ 1256 struct md_cluster_info *cinfo = mddev->cluster_info; 1257 struct cluster_msg cmsg; 1258 struct md_rdev *rdev; 1259 int ret = 0; 1260 int raid_slot = -1; 1261 1262 md_update_sb(mddev, 1); 1263 if (lock_comm(cinfo, 1)) { 1264 pr_err("%s: lock_comm failed\n", __func__); 1265 return; 1266 } 1267 1268 memset(&cmsg, 0, sizeof(cmsg)); 1269 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1270 rdev_for_each(rdev, mddev) 1271 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) { 1272 raid_slot = rdev->desc_nr; 1273 break; 1274 } 1275 if (raid_slot >= 0) { 1276 cmsg.raid_slot = cpu_to_le32(raid_slot); 1277 /* 1278 * We can only change capiticy after all the nodes can do it, 1279 * so need to wait after other nodes already received the msg 1280 * and handled the change 1281 */ 1282 ret = __sendmsg(cinfo, &cmsg); 1283 if (ret) { 1284 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1285 __func__, __LINE__); 1286 unlock_comm(cinfo); 1287 return; 1288 } 1289 } else { 1290 pr_err("md-cluster: No good device id found to send\n"); 1291 unlock_comm(cinfo); 1292 return; 1293 } 1294 1295 /* 1296 * check the sync_size from other node's bitmap, if sync_size 1297 * have already updated in other nodes as expected, send an 1298 * empty metadata msg to permit the change of capacity 1299 */ 1300 if (cluster_check_sync_size(mddev) == 0) { 1301 memset(&cmsg, 0, sizeof(cmsg)); 1302 cmsg.type = cpu_to_le32(CHANGE_CAPACITY); 1303 ret = __sendmsg(cinfo, &cmsg); 1304 if (ret) 1305 pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n", 1306 __func__, __LINE__); 1307 set_capacity(mddev->gendisk, mddev->array_sectors); 1308 revalidate_disk_size(mddev->gendisk, true); 1309 } else { 1310 /* revert to previous sectors */ 1311 ret = mddev->pers->resize(mddev, old_dev_sectors); 1312 if (!ret) 1313 revalidate_disk_size(mddev->gendisk, true); 1314 ret = __sendmsg(cinfo, &cmsg); 1315 if (ret) 1316 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1317 __func__, __LINE__); 1318 } 1319 unlock_comm(cinfo); 1320} 1321 1322static int resync_start(struct mddev *mddev) 1323{ 1324 struct md_cluster_info *cinfo = mddev->cluster_info; 1325 return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev); 1326} 1327 1328static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi) 1329{ 1330 struct md_cluster_info *cinfo = mddev->cluster_info; 1331 1332 spin_lock_irq(&cinfo->suspend_lock); 1333 *lo = cinfo->suspend_lo; 1334 *hi = cinfo->suspend_hi; 1335 spin_unlock_irq(&cinfo->suspend_lock); 1336} 1337 1338static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1339{ 1340 struct md_cluster_info *cinfo = mddev->cluster_info; 1341 struct resync_info ri; 1342 struct cluster_msg cmsg = {0}; 1343 1344 /* do not send zero again, if we have sent before */ 1345 if (hi == 0) { 1346 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1347 if (le64_to_cpu(ri.hi) == 0) 1348 return 0; 1349 } 1350 1351 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1352 /* Re-acquire the lock to refresh LVB */ 1353 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1354 cmsg.type = cpu_to_le32(RESYNCING); 1355 cmsg.low = cpu_to_le64(lo); 1356 cmsg.high = cpu_to_le64(hi); 1357 1358 /* 1359 * mddev_lock is held if resync_info_update is called from 1360 * resync_finish (md_reap_sync_thread -> resync_finish) 1361 */ 1362 if (lo == 0 && hi == 0) 1363 return sendmsg(cinfo, &cmsg, 1); 1364 else 1365 return sendmsg(cinfo, &cmsg, 0); 1366} 1367 1368static int resync_finish(struct mddev *mddev) 1369{ 1370 struct md_cluster_info *cinfo = mddev->cluster_info; 1371 int ret = 0; 1372 1373 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 1374 1375 /* 1376 * If resync thread is interrupted so we can't say resync is finished, 1377 * another node will launch resync thread to continue. 1378 */ 1379 if (!test_bit(MD_CLOSING, &mddev->flags)) 1380 ret = resync_info_update(mddev, 0, 0); 1381 dlm_unlock_sync(cinfo->resync_lockres); 1382 return ret; 1383} 1384 1385static int area_resyncing(struct mddev *mddev, int direction, 1386 sector_t lo, sector_t hi) 1387{ 1388 struct md_cluster_info *cinfo = mddev->cluster_info; 1389 int ret = 0; 1390 1391 if ((direction == READ) && 1392 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1393 return 1; 1394 1395 spin_lock_irq(&cinfo->suspend_lock); 1396 if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi) 1397 ret = 1; 1398 spin_unlock_irq(&cinfo->suspend_lock); 1399 return ret; 1400} 1401 1402/* add_new_disk() - initiates a disk add 1403 * However, if this fails before writing md_update_sb(), 1404 * add_new_disk_cancel() must be called to release token lock 1405 */ 1406static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1407{ 1408 struct md_cluster_info *cinfo = mddev->cluster_info; 1409 struct cluster_msg cmsg; 1410 int ret = 0; 1411 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1412 char *uuid = sb->device_uuid; 1413 1414 memset(&cmsg, 0, sizeof(cmsg)); 1415 cmsg.type = cpu_to_le32(NEWDISK); 1416 memcpy(cmsg.uuid, uuid, 16); 1417 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1418 if (lock_comm(cinfo, 1)) 1419 return -EAGAIN; 1420 ret = __sendmsg(cinfo, &cmsg); 1421 if (ret) { 1422 unlock_comm(cinfo); 1423 return ret; 1424 } 1425 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1426 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1427 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1428 /* Some node does not "see" the device */ 1429 if (ret == -EAGAIN) 1430 ret = -ENOENT; 1431 if (ret) 1432 unlock_comm(cinfo); 1433 else { 1434 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1435 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1436 * will run soon after add_new_disk, the below path will be 1437 * invoked: 1438 * md_wakeup_thread(mddev->thread) 1439 * -> conf->thread (raid1d) 1440 * -> md_check_recovery -> md_update_sb 1441 * -> metadata_update_start/finish 1442 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1443 * 1444 * For other failure cases, metadata_update_cancel and 1445 * add_new_disk_cancel also clear below bit as well. 1446 * */ 1447 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1448 wake_up(&cinfo->wait); 1449 } 1450 return ret; 1451} 1452 1453static void add_new_disk_cancel(struct mddev *mddev) 1454{ 1455 struct md_cluster_info *cinfo = mddev->cluster_info; 1456 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1457 unlock_comm(cinfo); 1458} 1459 1460static int new_disk_ack(struct mddev *mddev, bool ack) 1461{ 1462 struct md_cluster_info *cinfo = mddev->cluster_info; 1463 1464 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1465 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1466 return -EINVAL; 1467 } 1468 1469 if (ack) 1470 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1471 complete(&cinfo->newdisk_completion); 1472 return 0; 1473} 1474 1475static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1476{ 1477 struct cluster_msg cmsg = {0}; 1478 struct md_cluster_info *cinfo = mddev->cluster_info; 1479 cmsg.type = cpu_to_le32(REMOVE); 1480 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1481 return sendmsg(cinfo, &cmsg, 1); 1482} 1483 1484static int lock_all_bitmaps(struct mddev *mddev) 1485{ 1486 int slot, my_slot, ret, held = 1, i = 0; 1487 char str[64]; 1488 struct md_cluster_info *cinfo = mddev->cluster_info; 1489 1490 cinfo->other_bitmap_lockres = 1491 kcalloc(mddev->bitmap_info.nodes - 1, 1492 sizeof(struct dlm_lock_resource *), GFP_KERNEL); 1493 if (!cinfo->other_bitmap_lockres) { 1494 pr_err("md: can't alloc mem for other bitmap locks\n"); 1495 return 0; 1496 } 1497 1498 my_slot = slot_number(mddev); 1499 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1500 if (slot == my_slot) 1501 continue; 1502 1503 memset(str, '\0', 64); 1504 snprintf(str, 64, "bitmap%04d", slot); 1505 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1506 if (!cinfo->other_bitmap_lockres[i]) 1507 return -ENOMEM; 1508 1509 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1510 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1511 if (ret) 1512 held = -1; 1513 i++; 1514 } 1515 1516 return held; 1517} 1518 1519static void unlock_all_bitmaps(struct mddev *mddev) 1520{ 1521 struct md_cluster_info *cinfo = mddev->cluster_info; 1522 int i; 1523 1524 /* release other node's bitmap lock if they are existed */ 1525 if (cinfo->other_bitmap_lockres) { 1526 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1527 if (cinfo->other_bitmap_lockres[i]) { 1528 lockres_free(cinfo->other_bitmap_lockres[i]); 1529 } 1530 } 1531 kfree(cinfo->other_bitmap_lockres); 1532 cinfo->other_bitmap_lockres = NULL; 1533 } 1534} 1535 1536static int gather_bitmaps(struct md_rdev *rdev) 1537{ 1538 int sn, err; 1539 sector_t lo, hi; 1540 struct cluster_msg cmsg = {0}; 1541 struct mddev *mddev = rdev->mddev; 1542 struct md_cluster_info *cinfo = mddev->cluster_info; 1543 1544 cmsg.type = cpu_to_le32(RE_ADD); 1545 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1546 err = sendmsg(cinfo, &cmsg, 1); 1547 if (err) 1548 goto out; 1549 1550 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1551 if (sn == (cinfo->slot_number - 1)) 1552 continue; 1553 err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1554 if (err) { 1555 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1556 goto out; 1557 } 1558 if ((hi > 0) && (lo < mddev->recovery_cp)) 1559 mddev->recovery_cp = lo; 1560 } 1561out: 1562 return err; 1563} 1564 1565static struct md_cluster_operations cluster_ops = { 1566 .join = join, 1567 .leave = leave, 1568 .slot_number = slot_number, 1569 .resync_start = resync_start, 1570 .resync_finish = resync_finish, 1571 .resync_info_update = resync_info_update, 1572 .resync_info_get = resync_info_get, 1573 .metadata_update_start = metadata_update_start, 1574 .metadata_update_finish = metadata_update_finish, 1575 .metadata_update_cancel = metadata_update_cancel, 1576 .area_resyncing = area_resyncing, 1577 .add_new_disk = add_new_disk, 1578 .add_new_disk_cancel = add_new_disk_cancel, 1579 .new_disk_ack = new_disk_ack, 1580 .remove_disk = remove_disk, 1581 .load_bitmaps = load_bitmaps, 1582 .gather_bitmaps = gather_bitmaps, 1583 .resize_bitmaps = resize_bitmaps, 1584 .lock_all_bitmaps = lock_all_bitmaps, 1585 .unlock_all_bitmaps = unlock_all_bitmaps, 1586 .update_size = update_size, 1587}; 1588 1589static int __init cluster_init(void) 1590{ 1591 pr_warn("md-cluster: support raid1 and raid10 (limited support)\n"); 1592 pr_info("Registering Cluster MD functions\n"); 1593 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1594 return 0; 1595} 1596 1597static void cluster_exit(void) 1598{ 1599 unregister_md_cluster_operations(); 1600} 1601 1602module_init(cluster_init); 1603module_exit(cluster_exit); 1604MODULE_AUTHOR("SUSE"); 1605MODULE_LICENSE("GPL"); 1606MODULE_DESCRIPTION("Clustering support for MD"); 1607