1// SPDX-License-Identifier: GPL-2.0-or-later 2/* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * userdlm.c 6 * 7 * Code which implements the kernel side of a minimal userspace 8 * interface to our DLM. 9 * 10 * Many of the functions here are pared down versions of dlmglue.c 11 * functions. 12 * 13 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 14 */ 15 16#include <linux/signal.h> 17#include <linux/sched/signal.h> 18 19#include <linux/module.h> 20#include <linux/fs.h> 21#include <linux/types.h> 22#include <linux/crc32.h> 23 24#include "../ocfs2_lockingver.h" 25#include "../stackglue.h" 26#include "userdlm.h" 27 28#define MLOG_MASK_PREFIX ML_DLMFS 29#include "../cluster/masklog.h" 30 31 32static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 33{ 34 return container_of(lksb, struct user_lock_res, l_lksb); 35} 36 37static inline int user_check_wait_flag(struct user_lock_res *lockres, 38 int flag) 39{ 40 int ret; 41 42 spin_lock(&lockres->l_lock); 43 ret = lockres->l_flags & flag; 44 spin_unlock(&lockres->l_lock); 45 46 return ret; 47} 48 49static inline void user_wait_on_busy_lock(struct user_lock_res *lockres) 50 51{ 52 wait_event(lockres->l_event, 53 !user_check_wait_flag(lockres, USER_LOCK_BUSY)); 54} 55 56static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres) 57 58{ 59 wait_event(lockres->l_event, 60 !user_check_wait_flag(lockres, USER_LOCK_BLOCKED)); 61} 62 63/* I heart container_of... */ 64static inline struct ocfs2_cluster_connection * 65cluster_connection_from_user_lockres(struct user_lock_res *lockres) 66{ 67 struct dlmfs_inode_private *ip; 68 69 ip = container_of(lockres, 70 struct dlmfs_inode_private, 71 ip_lockres); 72 return ip->ip_conn; 73} 74 75static struct inode * 76user_dlm_inode_from_user_lockres(struct user_lock_res *lockres) 77{ 78 struct dlmfs_inode_private *ip; 79 80 ip = container_of(lockres, 81 struct dlmfs_inode_private, 82 ip_lockres); 83 return &ip->ip_vfs_inode; 84} 85 86static inline void user_recover_from_dlm_error(struct user_lock_res *lockres) 87{ 88 spin_lock(&lockres->l_lock); 89 lockres->l_flags &= ~USER_LOCK_BUSY; 90 spin_unlock(&lockres->l_lock); 91} 92 93#define user_log_dlm_error(_func, _stat, _lockres) do { \ 94 mlog(ML_ERROR, "Dlm error %d while calling %s on " \ 95 "resource %.*s\n", _stat, _func, \ 96 _lockres->l_namelen, _lockres->l_name); \ 97} while (0) 98 99/* WARNING: This function lives in a world where the only three lock 100 * levels are EX, PR, and NL. It *will* have to be adjusted when more 101 * lock types are added. */ 102static inline int user_highest_compat_lock_level(int level) 103{ 104 int new_level = DLM_LOCK_EX; 105 106 if (level == DLM_LOCK_EX) 107 new_level = DLM_LOCK_NL; 108 else if (level == DLM_LOCK_PR) 109 new_level = DLM_LOCK_PR; 110 return new_level; 111} 112 113static void user_ast(struct ocfs2_dlm_lksb *lksb) 114{ 115 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 116 int status; 117 118 mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n", 119 lockres->l_namelen, lockres->l_name, lockres->l_level, 120 lockres->l_requested); 121 122 spin_lock(&lockres->l_lock); 123 124 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 125 if (status) { 126 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n", 127 status, lockres->l_namelen, lockres->l_name); 128 spin_unlock(&lockres->l_lock); 129 return; 130 } 131 132 mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV, 133 "Lockres %.*s, requested ivmode. flags 0x%x\n", 134 lockres->l_namelen, lockres->l_name, lockres->l_flags); 135 136 /* we're downconverting. */ 137 if (lockres->l_requested < lockres->l_level) { 138 if (lockres->l_requested <= 139 user_highest_compat_lock_level(lockres->l_blocking)) { 140 lockres->l_blocking = DLM_LOCK_NL; 141 lockres->l_flags &= ~USER_LOCK_BLOCKED; 142 } 143 } 144 145 lockres->l_level = lockres->l_requested; 146 lockres->l_requested = DLM_LOCK_IV; 147 lockres->l_flags |= USER_LOCK_ATTACHED; 148 lockres->l_flags &= ~USER_LOCK_BUSY; 149 150 spin_unlock(&lockres->l_lock); 151 152 wake_up(&lockres->l_event); 153} 154 155static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres) 156{ 157 struct inode *inode; 158 inode = user_dlm_inode_from_user_lockres(lockres); 159 if (!igrab(inode)) 160 BUG(); 161} 162 163static void user_dlm_unblock_lock(struct work_struct *work); 164 165static void __user_dlm_queue_lockres(struct user_lock_res *lockres) 166{ 167 if (!(lockres->l_flags & USER_LOCK_QUEUED)) { 168 user_dlm_grab_inode_ref(lockres); 169 170 INIT_WORK(&lockres->l_work, user_dlm_unblock_lock); 171 172 queue_work(user_dlm_worker, &lockres->l_work); 173 lockres->l_flags |= USER_LOCK_QUEUED; 174 } 175} 176 177static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres) 178{ 179 int queue = 0; 180 181 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) 182 return; 183 184 switch (lockres->l_blocking) { 185 case DLM_LOCK_EX: 186 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 187 queue = 1; 188 break; 189 case DLM_LOCK_PR: 190 if (!lockres->l_ex_holders) 191 queue = 1; 192 break; 193 default: 194 BUG(); 195 } 196 197 if (queue) 198 __user_dlm_queue_lockres(lockres); 199} 200 201static void user_bast(struct ocfs2_dlm_lksb *lksb, int level) 202{ 203 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 204 205 mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n", 206 lockres->l_namelen, lockres->l_name, level, lockres->l_level); 207 208 spin_lock(&lockres->l_lock); 209 lockres->l_flags |= USER_LOCK_BLOCKED; 210 if (level > lockres->l_blocking) 211 lockres->l_blocking = level; 212 213 __user_dlm_queue_lockres(lockres); 214 spin_unlock(&lockres->l_lock); 215 216 wake_up(&lockres->l_event); 217} 218 219static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status) 220{ 221 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 222 223 mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n", 224 lockres->l_namelen, lockres->l_name, lockres->l_flags); 225 226 if (status) 227 mlog(ML_ERROR, "dlm returns status %d\n", status); 228 229 spin_lock(&lockres->l_lock); 230 /* The teardown flag gets set early during the unlock process, 231 * so test the cancel flag to make sure that this ast isn't 232 * for a concurrent cancel. */ 233 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN 234 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) { 235 lockres->l_level = DLM_LOCK_IV; 236 } else if (status == DLM_CANCELGRANT) { 237 /* We tried to cancel a convert request, but it was 238 * already granted. Don't clear the busy flag - the 239 * ast should've done this already. */ 240 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 241 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 242 goto out_noclear; 243 } else { 244 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 245 /* Cancel succeeded, we want to re-queue */ 246 lockres->l_requested = DLM_LOCK_IV; /* cancel an 247 * upconvert 248 * request. */ 249 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 250 /* we want the unblock thread to look at it again 251 * now. */ 252 if (lockres->l_flags & USER_LOCK_BLOCKED) 253 __user_dlm_queue_lockres(lockres); 254 } 255 256 lockres->l_flags &= ~USER_LOCK_BUSY; 257out_noclear: 258 spin_unlock(&lockres->l_lock); 259 260 wake_up(&lockres->l_event); 261} 262 263/* 264 * This is the userdlmfs locking protocol version. 265 * 266 * See fs/ocfs2/dlmglue.c for more details on locking versions. 267 */ 268static struct ocfs2_locking_protocol user_dlm_lproto = { 269 .lp_max_version = { 270 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 271 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 272 }, 273 .lp_lock_ast = user_ast, 274 .lp_blocking_ast = user_bast, 275 .lp_unlock_ast = user_unlock_ast, 276}; 277 278static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres) 279{ 280 struct inode *inode; 281 inode = user_dlm_inode_from_user_lockres(lockres); 282 iput(inode); 283} 284 285static void user_dlm_unblock_lock(struct work_struct *work) 286{ 287 int new_level, status; 288 struct user_lock_res *lockres = 289 container_of(work, struct user_lock_res, l_work); 290 struct ocfs2_cluster_connection *conn = 291 cluster_connection_from_user_lockres(lockres); 292 293 mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 294 295 spin_lock(&lockres->l_lock); 296 297 mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED), 298 "Lockres %.*s, flags 0x%x\n", 299 lockres->l_namelen, lockres->l_name, lockres->l_flags); 300 301 /* notice that we don't clear USER_LOCK_BLOCKED here. If it's 302 * set, we want user_ast clear it. */ 303 lockres->l_flags &= ~USER_LOCK_QUEUED; 304 305 /* It's valid to get here and no longer be blocked - if we get 306 * several basts in a row, we might be queued by the first 307 * one, the unblock thread might run and clear the queued 308 * flag, and finally we might get another bast which re-queues 309 * us before our ast for the downconvert is called. */ 310 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) { 311 mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n", 312 lockres->l_namelen, lockres->l_name); 313 spin_unlock(&lockres->l_lock); 314 goto drop_ref; 315 } 316 317 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 318 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n", 319 lockres->l_namelen, lockres->l_name); 320 spin_unlock(&lockres->l_lock); 321 goto drop_ref; 322 } 323 324 if (lockres->l_flags & USER_LOCK_BUSY) { 325 if (lockres->l_flags & USER_LOCK_IN_CANCEL) { 326 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n", 327 lockres->l_namelen, lockres->l_name); 328 spin_unlock(&lockres->l_lock); 329 goto drop_ref; 330 } 331 332 lockres->l_flags |= USER_LOCK_IN_CANCEL; 333 spin_unlock(&lockres->l_lock); 334 335 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, 336 DLM_LKF_CANCEL); 337 if (status) 338 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 339 goto drop_ref; 340 } 341 342 /* If there are still incompat holders, we can exit safely 343 * without worrying about re-queueing this lock as that will 344 * happen on the last call to user_cluster_unlock. */ 345 if ((lockres->l_blocking == DLM_LOCK_EX) 346 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 347 spin_unlock(&lockres->l_lock); 348 mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n", 349 lockres->l_namelen, lockres->l_name, 350 lockres->l_ex_holders, lockres->l_ro_holders); 351 goto drop_ref; 352 } 353 354 if ((lockres->l_blocking == DLM_LOCK_PR) 355 && lockres->l_ex_holders) { 356 spin_unlock(&lockres->l_lock); 357 mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n", 358 lockres->l_namelen, lockres->l_name, 359 lockres->l_ex_holders); 360 goto drop_ref; 361 } 362 363 /* yay, we can downconvert now. */ 364 new_level = user_highest_compat_lock_level(lockres->l_blocking); 365 lockres->l_requested = new_level; 366 lockres->l_flags |= USER_LOCK_BUSY; 367 mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n", 368 lockres->l_namelen, lockres->l_name, lockres->l_level, new_level); 369 spin_unlock(&lockres->l_lock); 370 371 /* need lock downconvert request now... */ 372 status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb, 373 DLM_LKF_CONVERT|DLM_LKF_VALBLK, 374 lockres->l_name, 375 lockres->l_namelen); 376 if (status) { 377 user_log_dlm_error("ocfs2_dlm_lock", status, lockres); 378 user_recover_from_dlm_error(lockres); 379 } 380 381drop_ref: 382 user_dlm_drop_inode_ref(lockres); 383} 384 385static inline void user_dlm_inc_holders(struct user_lock_res *lockres, 386 int level) 387{ 388 switch(level) { 389 case DLM_LOCK_EX: 390 lockres->l_ex_holders++; 391 break; 392 case DLM_LOCK_PR: 393 lockres->l_ro_holders++; 394 break; 395 default: 396 BUG(); 397 } 398} 399 400/* predict what lock level we'll be dropping down to on behalf 401 * of another node, and return true if the currently wanted 402 * level will be compatible with it. */ 403static inline int 404user_may_continue_on_blocked_lock(struct user_lock_res *lockres, 405 int wanted) 406{ 407 BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED)); 408 409 return wanted <= user_highest_compat_lock_level(lockres->l_blocking); 410} 411 412int user_dlm_cluster_lock(struct user_lock_res *lockres, 413 int level, 414 int lkm_flags) 415{ 416 int status, local_flags; 417 struct ocfs2_cluster_connection *conn = 418 cluster_connection_from_user_lockres(lockres); 419 420 if (level != DLM_LOCK_EX && 421 level != DLM_LOCK_PR) { 422 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 423 lockres->l_namelen, lockres->l_name); 424 status = -EINVAL; 425 goto bail; 426 } 427 428 mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n", 429 lockres->l_namelen, lockres->l_name, level, lkm_flags); 430 431again: 432 if (signal_pending(current)) { 433 status = -ERESTARTSYS; 434 goto bail; 435 } 436 437 spin_lock(&lockres->l_lock); 438 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 439 spin_unlock(&lockres->l_lock); 440 status = -EAGAIN; 441 goto bail; 442 } 443 444 /* We only compare against the currently granted level 445 * here. If the lock is blocked waiting on a downconvert, 446 * we'll get caught below. */ 447 if ((lockres->l_flags & USER_LOCK_BUSY) && 448 (level > lockres->l_level)) { 449 /* is someone sitting in dlm_lock? If so, wait on 450 * them. */ 451 spin_unlock(&lockres->l_lock); 452 453 user_wait_on_busy_lock(lockres); 454 goto again; 455 } 456 457 if ((lockres->l_flags & USER_LOCK_BLOCKED) && 458 (!user_may_continue_on_blocked_lock(lockres, level))) { 459 /* is the lock is currently blocked on behalf of 460 * another node */ 461 spin_unlock(&lockres->l_lock); 462 463 user_wait_on_blocked_lock(lockres); 464 goto again; 465 } 466 467 if (level > lockres->l_level) { 468 local_flags = lkm_flags | DLM_LKF_VALBLK; 469 if (lockres->l_level != DLM_LOCK_IV) 470 local_flags |= DLM_LKF_CONVERT; 471 472 lockres->l_requested = level; 473 lockres->l_flags |= USER_LOCK_BUSY; 474 spin_unlock(&lockres->l_lock); 475 476 BUG_ON(level == DLM_LOCK_IV); 477 BUG_ON(level == DLM_LOCK_NL); 478 479 /* call dlm_lock to upgrade lock now */ 480 status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb, 481 local_flags, lockres->l_name, 482 lockres->l_namelen); 483 if (status) { 484 if ((lkm_flags & DLM_LKF_NOQUEUE) && 485 (status != -EAGAIN)) 486 user_log_dlm_error("ocfs2_dlm_lock", 487 status, lockres); 488 user_recover_from_dlm_error(lockres); 489 goto bail; 490 } 491 492 user_wait_on_busy_lock(lockres); 493 goto again; 494 } 495 496 user_dlm_inc_holders(lockres, level); 497 spin_unlock(&lockres->l_lock); 498 499 status = 0; 500bail: 501 return status; 502} 503 504static inline void user_dlm_dec_holders(struct user_lock_res *lockres, 505 int level) 506{ 507 switch(level) { 508 case DLM_LOCK_EX: 509 BUG_ON(!lockres->l_ex_holders); 510 lockres->l_ex_holders--; 511 break; 512 case DLM_LOCK_PR: 513 BUG_ON(!lockres->l_ro_holders); 514 lockres->l_ro_holders--; 515 break; 516 default: 517 BUG(); 518 } 519} 520 521void user_dlm_cluster_unlock(struct user_lock_res *lockres, 522 int level) 523{ 524 if (level != DLM_LOCK_EX && 525 level != DLM_LOCK_PR) { 526 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 527 lockres->l_namelen, lockres->l_name); 528 return; 529 } 530 531 spin_lock(&lockres->l_lock); 532 user_dlm_dec_holders(lockres, level); 533 __user_dlm_cond_queue_lockres(lockres); 534 spin_unlock(&lockres->l_lock); 535} 536 537void user_dlm_write_lvb(struct inode *inode, 538 const char *val, 539 unsigned int len) 540{ 541 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 542 char *lvb; 543 544 BUG_ON(len > DLM_LVB_LEN); 545 546 spin_lock(&lockres->l_lock); 547 548 BUG_ON(lockres->l_level < DLM_LOCK_EX); 549 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 550 memcpy(lvb, val, len); 551 552 spin_unlock(&lockres->l_lock); 553} 554 555bool user_dlm_read_lvb(struct inode *inode, char *val) 556{ 557 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 558 char *lvb; 559 bool ret = true; 560 561 spin_lock(&lockres->l_lock); 562 563 BUG_ON(lockres->l_level < DLM_LOCK_PR); 564 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) { 565 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 566 memcpy(val, lvb, DLM_LVB_LEN); 567 } else 568 ret = false; 569 570 spin_unlock(&lockres->l_lock); 571 return ret; 572} 573 574void user_dlm_lock_res_init(struct user_lock_res *lockres, 575 struct dentry *dentry) 576{ 577 memset(lockres, 0, sizeof(*lockres)); 578 579 spin_lock_init(&lockres->l_lock); 580 init_waitqueue_head(&lockres->l_event); 581 lockres->l_level = DLM_LOCK_IV; 582 lockres->l_requested = DLM_LOCK_IV; 583 lockres->l_blocking = DLM_LOCK_IV; 584 585 /* should have been checked before getting here. */ 586 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN); 587 588 memcpy(lockres->l_name, 589 dentry->d_name.name, 590 dentry->d_name.len); 591 lockres->l_namelen = dentry->d_name.len; 592} 593 594int user_dlm_destroy_lock(struct user_lock_res *lockres) 595{ 596 int status = -EBUSY; 597 struct ocfs2_cluster_connection *conn = 598 cluster_connection_from_user_lockres(lockres); 599 600 mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 601 602 spin_lock(&lockres->l_lock); 603 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 604 spin_unlock(&lockres->l_lock); 605 goto bail; 606 } 607 608 lockres->l_flags |= USER_LOCK_IN_TEARDOWN; 609 610 while (lockres->l_flags & USER_LOCK_BUSY) { 611 spin_unlock(&lockres->l_lock); 612 613 user_wait_on_busy_lock(lockres); 614 615 spin_lock(&lockres->l_lock); 616 } 617 618 if (lockres->l_ro_holders || lockres->l_ex_holders) { 619 lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN; 620 spin_unlock(&lockres->l_lock); 621 goto bail; 622 } 623 624 status = 0; 625 if (!(lockres->l_flags & USER_LOCK_ATTACHED)) { 626 /* 627 * lock is never requested, leave USER_LOCK_IN_TEARDOWN set 628 * to avoid new lock request coming in. 629 */ 630 spin_unlock(&lockres->l_lock); 631 goto bail; 632 } 633 634 lockres->l_flags &= ~USER_LOCK_ATTACHED; 635 lockres->l_flags |= USER_LOCK_BUSY; 636 spin_unlock(&lockres->l_lock); 637 638 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK); 639 if (status) { 640 spin_lock(&lockres->l_lock); 641 lockres->l_flags &= ~USER_LOCK_IN_TEARDOWN; 642 lockres->l_flags &= ~USER_LOCK_BUSY; 643 spin_unlock(&lockres->l_lock); 644 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 645 goto bail; 646 } 647 648 user_wait_on_busy_lock(lockres); 649 650 status = 0; 651bail: 652 return status; 653} 654 655static void user_dlm_recovery_handler_noop(int node_num, 656 void *recovery_data) 657{ 658 /* We ignore recovery events */ 659 return; 660} 661 662void user_dlm_set_locking_protocol(void) 663{ 664 ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version); 665} 666 667struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name) 668{ 669 int rc; 670 struct ocfs2_cluster_connection *conn; 671 672 rc = ocfs2_cluster_connect_agnostic(name->name, name->len, 673 &user_dlm_lproto, 674 user_dlm_recovery_handler_noop, 675 NULL, &conn); 676 if (rc) 677 mlog_errno(rc); 678 679 return rc ? ERR_PTR(rc) : conn; 680} 681 682void user_dlm_unregister(struct ocfs2_cluster_connection *conn) 683{ 684 ocfs2_cluster_disconnect(conn, 0); 685} 686