Lines Matching refs:ls
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
203 static inline void dlm_lock_recovery(struct dlm_ls *ls)
205 down_read(&ls->ls_in_recovery);
208 void dlm_unlock_recovery(struct dlm_ls *ls)
210 up_read(&ls->ls_in_recovery);
213 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 return down_read_trylock(&ls->ls_in_recovery);
349 struct dlm_ls *ls = r->res_ls;
352 spin_lock(&ls->ls_rsbtbl[bucket].lock);
354 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
362 static int pre_rsb_struct(struct dlm_ls *ls)
367 spin_lock(&ls->ls_new_rsb_spin);
368 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
369 spin_unlock(&ls->ls_new_rsb_spin);
372 spin_unlock(&ls->ls_new_rsb_spin);
374 r1 = dlm_allocate_rsb(ls);
375 r2 = dlm_allocate_rsb(ls);
377 spin_lock(&ls->ls_new_rsb_spin);
379 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
380 ls->ls_new_rsb_count++;
383 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
384 ls->ls_new_rsb_count++;
386 count = ls->ls_new_rsb_count;
387 spin_unlock(&ls->ls_new_rsb_spin);
394 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
398 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
404 spin_lock(&ls->ls_new_rsb_spin);
405 if (list_empty(&ls->ls_new_rsb)) {
406 count = ls->ls_new_rsb_count;
407 spin_unlock(&ls->ls_new_rsb_spin);
408 log_debug(ls, "find_rsb retry %d %d %s",
413 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
417 ls->ls_new_rsb_count--;
418 spin_unlock(&ls->ls_new_rsb_spin);
420 r->res_ls = ls;
543 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
587 error = pre_rsb_struct(ls);
592 spin_lock(&ls->ls_rsbtbl[b].lock);
594 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
608 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
622 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
631 log_error(ls, "find_rsb toss from_dir %d master %d",
648 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
649 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
661 error = get_rsb_struct(ls, name, len, &r);
663 spin_unlock(&ls->ls_rsbtbl[b].lock);
676 log_debug(ls, "find_rsb new from_dir %d recreate %s",
685 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
694 log_debug(ls, "find_rsb new from_other %d dir %d %s",
710 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
712 spin_unlock(&ls->ls_rsbtbl[b].lock);
722 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
733 error = pre_rsb_struct(ls);
737 spin_lock(&ls->ls_rsbtbl[b].lock);
739 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
752 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
765 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
776 log_error(ls, "find_rsb toss our %d master %d dir %d",
783 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
784 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
793 error = get_rsb_struct(ls, name, len, &r);
795 spin_unlock(&ls->ls_rsbtbl[b].lock);
808 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
810 spin_unlock(&ls->ls_rsbtbl[b].lock);
816 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
826 b = hash & (ls->ls_rsbtbl_size - 1);
828 dir_nodeid = dlm_hash2nodeid(ls, hash);
830 if (dlm_no_directory(ls))
831 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
834 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
841 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
844 if (dlm_no_directory(ls)) {
845 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
858 log_debug(ls, "validate master from_other %d master %d "
869 log_error(ls, "validate master from_dir %d master %d "
910 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
924 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
930 b = hash & (ls->ls_rsbtbl_size - 1);
932 dir_nodeid = dlm_hash2nodeid(ls, hash);
934 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
936 ls->ls_num_nodes);
942 error = pre_rsb_struct(ls);
946 spin_lock(&ls->ls_rsbtbl[b].lock);
947 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
953 spin_unlock(&ls->ls_rsbtbl[b].lock);
958 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
969 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
974 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
986 log_error(ls, "dlm_master_lookup fix_master on toss");
996 log_limit(ls, "dlm_master_lookup from_master %d "
1002 log_error(ls, "from_master %d our_master", from_nodeid);
1016 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1028 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1041 spin_unlock(&ls->ls_rsbtbl[b].lock);
1050 error = get_rsb_struct(ls, name, len, &r);
1052 spin_unlock(&ls->ls_rsbtbl[b].lock);
1066 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1070 spin_unlock(&ls->ls_rsbtbl[b].lock);
1079 spin_unlock(&ls->ls_rsbtbl[b].lock);
1083 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1089 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1090 spin_lock(&ls->ls_rsbtbl[i].lock);
1091 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1096 spin_unlock(&ls->ls_rsbtbl[i].lock);
1100 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1107 b = hash & (ls->ls_rsbtbl_size - 1);
1109 spin_lock(&ls->ls_rsbtbl[b].lock);
1110 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1114 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1120 spin_unlock(&ls->ls_rsbtbl[b].lock);
1126 struct dlm_ls *ls = r->res_ls;
1130 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1131 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1133 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1181 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1186 lkb = dlm_allocate_lkb(ls);
1201 spin_lock(&ls->ls_lkbidr_spin);
1202 rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1205 spin_unlock(&ls->ls_lkbidr_spin);
1209 log_error(ls, "create_lkb idr error %d", rv);
1218 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1222 spin_lock(&ls->ls_lkbidr_spin);
1223 lkb = idr_find(&ls->ls_lkbidr, lkid);
1226 spin_unlock(&ls->ls_lkbidr_spin);
1245 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1249 spin_lock(&ls->ls_lkbidr_spin);
1251 idr_remove(&ls->ls_lkbidr, lkid);
1252 spin_unlock(&ls->ls_lkbidr_spin);
1262 spin_unlock(&ls->ls_lkbidr_spin);
1269 struct dlm_ls *ls;
1274 ls = lkb->lkb_resource->res_ls;
1275 return __put_lkb(ls, lkb);
1393 void dlm_scan_waiters(struct dlm_ls *ls)
1406 mutex_lock(&ls->ls_waiters_mutex);
1408 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1426 num_nodes = ls->ls_num_nodes;
1434 log_error(ls, "waitwarn %x %lld %d us check connection to "
1438 mutex_unlock(&ls->ls_waiters_mutex);
1442 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1452 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1455 mutex_lock(&ls->ls_waiters_mutex);
1478 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1493 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1496 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1499 mutex_unlock(&ls->ls_waiters_mutex);
1511 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1515 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1522 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1533 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1549 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1566 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1578 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1597 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1600 mutex_lock(&ls->ls_waiters_mutex);
1602 mutex_unlock(&ls->ls_waiters_mutex);
1611 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1615 mutex_lock(&ls->ls_waiters_mutex);
1618 mutex_unlock(&ls->ls_waiters_mutex);
1629 struct dlm_ls *ls = r->res_ls;
1631 spin_lock(&ls->ls_remove_spin);
1632 if (ls->ls_remove_len &&
1633 !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1634 log_debug(ls, "delay lookup for remove dir %d %s",
1636 spin_unlock(&ls->ls_remove_spin);
1640 spin_unlock(&ls->ls_remove_spin);
1650 static void shrink_bucket(struct dlm_ls *ls, int b)
1660 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1662 spin_lock(&ls->ls_rsbtbl[b].lock);
1664 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1665 spin_unlock(&ls->ls_rsbtbl[b].lock);
1669 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1678 if (!dlm_no_directory(ls) &&
1691 if (!dlm_no_directory(ls) &&
1699 ls->ls_remove_lens[remote_count] = r->res_length;
1700 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1710 log_error(ls, "tossed rsb in use %s", r->res_name);
1714 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1719 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1721 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1722 spin_unlock(&ls->ls_rsbtbl[b].lock);
1739 name = ls->ls_remove_names[i];
1740 len = ls->ls_remove_lens[i];
1742 spin_lock(&ls->ls_rsbtbl[b].lock);
1743 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1745 spin_unlock(&ls->ls_rsbtbl[b].lock);
1746 log_debug(ls, "remove_name not toss %s", name);
1751 spin_unlock(&ls->ls_rsbtbl[b].lock);
1752 log_debug(ls, "remove_name master %d dir %d our %d %s",
1760 spin_unlock(&ls->ls_rsbtbl[b].lock);
1761 log_error(ls, "remove_name dir %d master %d our %d %s",
1769 spin_unlock(&ls->ls_rsbtbl[b].lock);
1770 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1776 spin_unlock(&ls->ls_rsbtbl[b].lock);
1777 log_error(ls, "remove_name in use %s", name);
1781 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1784 spin_lock(&ls->ls_remove_spin);
1785 ls->ls_remove_len = len;
1786 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1787 spin_unlock(&ls->ls_remove_spin);
1788 spin_unlock(&ls->ls_rsbtbl[b].lock);
1793 spin_lock(&ls->ls_remove_spin);
1794 ls->ls_remove_len = 0;
1795 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1796 spin_unlock(&ls->ls_remove_spin);
1802 void dlm_scan_rsbs(struct dlm_ls *ls)
1806 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1807 shrink_bucket(ls, i);
1808 if (dlm_locking_stopped(ls))
1816 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1821 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1832 mutex_lock(&ls->ls_timeout_mutex);
1834 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1835 mutex_unlock(&ls->ls_timeout_mutex);
1840 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1842 mutex_lock(&ls->ls_timeout_mutex);
1847 mutex_unlock(&ls->ls_timeout_mutex);
1856 void dlm_scan_timeout(struct dlm_ls *ls)
1864 if (dlm_locking_stopped(ls))
1869 mutex_lock(&ls->ls_timeout_mutex);
1870 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
1889 mutex_unlock(&ls->ls_timeout_mutex);
1907 log_debug(ls, "timeout cancel %x node %d %s",
1924 void dlm_adjust_timeouts(struct dlm_ls *ls)
1927 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1929 ls->ls_recover_begin = 0;
1930 mutex_lock(&ls->ls_timeout_mutex);
1931 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1933 mutex_unlock(&ls->ls_timeout_mutex);
1938 mutex_lock(&ls->ls_waiters_mutex);
1939 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1943 mutex_unlock(&ls->ls_waiters_mutex);
2889 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2926 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2942 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2946 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2956 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2966 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
3063 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3317 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3323 error = validate_lock_args(ls, lkb, args);
3327 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3343 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3354 error = validate_lock_args(ls, lkb, args);
3365 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3387 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3424 struct dlm_ls *ls;
3429 ls = dlm_find_lockspace_local(lockspace);
3430 if (!ls)
3433 dlm_lock_recovery(ls);
3436 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3438 error = create_lkb(ls, &lkb);
3449 error = convert_lock(ls, lkb, &args);
3451 error = request_lock(ls, lkb, name, namelen, &args);
3457 __put_lkb(ls, lkb);
3461 dlm_unlock_recovery(ls);
3462 dlm_put_lockspace(ls);
3472 struct dlm_ls *ls;
3477 ls = dlm_find_lockspace_local(lockspace);
3478 if (!ls)
3481 dlm_lock_recovery(ls);
3483 error = find_lkb(ls, lkid, &lkb);
3492 error = cancel_lock(ls, lkb, &args);
3494 error = unlock_lock(ls, lkb, &args);
3503 dlm_unlock_recovery(ls);
3504 dlm_put_lockspace(ls);
3530 static int _create_message(struct dlm_ls *ls, int mb_len,
3552 ms->m_header.h_lockspace = ls->ls_global_id;
3842 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3845 struct dlm_rsb *r = &ls->ls_stub_rsb;
3890 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3897 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3901 if (len > ls->ls_lvblen)
3902 len = ls->ls_lvblen;
3918 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3932 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3940 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3946 if (receive_lvb(ls, lkb, ms))
3955 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3958 if (receive_lvb(ls, lkb, ms))
3966 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3968 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
4026 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4039 b = hash & (ls->ls_rsbtbl_size - 1);
4041 dir_nodeid = dlm_hash2nodeid(ls, hash);
4043 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4045 spin_lock(&ls->ls_rsbtbl[b].lock);
4046 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4048 spin_unlock(&ls->ls_rsbtbl[b].lock);
4049 log_error(ls, "repeat_remove on keep %s", name);
4053 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4055 spin_unlock(&ls->ls_rsbtbl[b].lock);
4056 log_error(ls, "repeat_remove on toss %s", name);
4060 /* use ls->remove_name2 to avoid conflict with shrink? */
4062 spin_lock(&ls->ls_remove_spin);
4063 ls->ls_remove_len = len;
4064 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4065 spin_unlock(&ls->ls_remove_spin);
4066 spin_unlock(&ls->ls_rsbtbl[b].lock);
4068 rv = _create_message(ls, sizeof(struct dlm_message) + len,
4079 spin_lock(&ls->ls_remove_spin);
4080 ls->ls_remove_len = 0;
4081 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4082 spin_unlock(&ls->ls_remove_spin);
4085 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4094 error = create_lkb(ls, &lkb);
4100 error = receive_request_args(ls, lkb, ms);
4102 __put_lkb(ls, lkb);
4114 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4117 __put_lkb(ls, lkb);
4124 error = validate_master_nodeid(ls, r, from_nodeid);
4128 __put_lkb(ls, lkb);
4164 log_limit(ls, "receive_request %x from %d %d",
4169 send_repeat_remove(ls, ms->m_extra, namelen);
4173 setup_stub_lkb(ls, ms);
4174 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4178 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4184 error = find_lkb(ls, ms->m_remid, &lkb);
4189 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4209 error = receive_convert_args(ls, lkb, ms);
4228 setup_stub_lkb(ls, ms);
4229 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4233 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4239 error = find_lkb(ls, ms->m_remid, &lkb);
4244 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4263 error = receive_unlock_args(ls, lkb, ms);
4279 setup_stub_lkb(ls, ms);
4280 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4284 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4290 error = find_lkb(ls, ms->m_remid, &lkb);
4315 setup_stub_lkb(ls, ms);
4316 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4320 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4326 error = find_lkb(ls, ms->m_remid, &lkb);
4351 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4357 error = find_lkb(ls, ms->m_remid, &lkb);
4379 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4388 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4393 receive_request(ls, ms);
4396 send_lookup_reply(ls, ms, ret_nodeid, error);
4399 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4411 log_error(ls, "receive_remove from %d bad len %d",
4416 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4418 log_error(ls, "receive_remove from %d bad nodeid %d",
4436 b = hash & (ls->ls_rsbtbl_size - 1);
4438 spin_lock(&ls->ls_rsbtbl[b].lock);
4440 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4443 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4446 log_error(ls, "receive_remove from %d not found %s",
4448 spin_unlock(&ls->ls_rsbtbl[b].lock);
4453 log_error(ls, "receive_remove keep from %d master %d",
4456 spin_unlock(&ls->ls_rsbtbl[b].lock);
4460 log_debug(ls, "receive_remove from %d master %d first %x %s",
4463 spin_unlock(&ls->ls_rsbtbl[b].lock);
4468 log_error(ls, "receive_remove toss from %d master %d",
4471 spin_unlock(&ls->ls_rsbtbl[b].lock);
4476 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4477 spin_unlock(&ls->ls_rsbtbl[b].lock);
4480 log_error(ls, "receive_remove from %d rsb ref error",
4483 spin_unlock(&ls->ls_rsbtbl[b].lock);
4487 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4489 do_purge(ls, ms->m_nodeid, ms->m_pid);
4492 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4499 error = find_lkb(ls, ms->m_remid, &lkb);
4514 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4559 log_limit(ls, "receive_request_reply %x from %d %d "
4586 log_error(ls, "receive_request_reply %x error %d",
4591 log_debug(ls, "receive_request_reply %x result %d unlock",
4597 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4679 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4684 error = find_lkb(ls, ms->m_remid, &lkb);
4729 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4734 error = find_lkb(ls, ms->m_remid, &lkb);
4779 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4784 error = find_lkb(ls, ms->m_remid, &lkb);
4793 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4800 error = find_lkb(ls, ms->m_lkid, &lkb);
4802 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4827 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4841 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4853 log_debug(ls, "receive_lookup_reply %x unlock %x",
4871 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4876 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4877 log_limit(ls, "receive %d from non-member %d %x %x %d",
4888 error = receive_request(ls, ms);
4892 error = receive_convert(ls, ms);
4896 error = receive_unlock(ls, ms);
4901 error = receive_cancel(ls, ms);
4907 error = receive_request_reply(ls, ms);
4911 error = receive_convert_reply(ls, ms);
4915 error = receive_unlock_reply(ls, ms);
4919 error = receive_cancel_reply(ls, ms);
4926 error = receive_grant(ls, ms);
4931 error = receive_bast(ls, ms);
4937 receive_lookup(ls, ms);
4941 receive_remove(ls, ms);
4947 receive_lookup_reply(ls, ms);
4953 receive_purge(ls, ms);
4957 log_error(ls, "unknown message type %d", ms->m_type);
4972 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4976 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4981 dlm_dump_rsb_hash(ls, ms->m_hash);
4985 log_error(ls, "receive %d inval from %d lkid %x remid %x "
5000 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5003 if (dlm_locking_stopped(ls)) {
5007 if (!ls->ls_generation) {
5008 log_limit(ls, "receive %d from %d ignore old gen",
5013 dlm_add_requestqueue(ls, nodeid, ms);
5015 dlm_wait_requestqueue(ls);
5016 _receive_message(ls, ms, 0);
5023 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5026 _receive_message(ls, ms, saved_seq);
5037 struct dlm_ls *ls;
5060 ls = dlm_find_lockspace_global(hd->h_lockspace);
5061 if (!ls) {
5074 be inactive (in this ls) before transitioning to recovery mode */
5076 down_read(&ls->ls_recv_active);
5078 dlm_receive_message(ls, &p->message, nodeid);
5080 dlm_receive_rcom(ls, &p->rcom, nodeid);
5081 up_read(&ls->ls_recv_active);
5083 dlm_put_lockspace(ls);
5086 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5114 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5117 if (dlm_no_directory(ls))
5120 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5132 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5143 mutex_lock(&ls->ls_waiters_mutex);
5145 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5153 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5172 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5196 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5208 recover_convert_waiter(ls, lkb, ms_stub);
5234 log_error(ls, "invalid lkb wait_type %d %d",
5239 mutex_unlock(&ls->ls_waiters_mutex);
5243 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5247 mutex_lock(&ls->ls_waiters_mutex);
5248 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5255 mutex_unlock(&ls->ls_waiters_mutex);
5276 int dlm_recover_waiters_post(struct dlm_ls *ls)
5283 if (dlm_locking_stopped(ls)) {
5284 log_debug(ls, "recover_waiters_post aborted");
5289 lkb = find_resend_waiter(ls);
5302 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5323 mutex_lock(&ls->ls_waiters_mutex);
5325 mutex_unlock(&ls->ls_waiters_mutex);
5364 log_error(ls, "waiter %x msg %d r_nodeid %d "
5377 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5389 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5396 log_error(ls, "purged mstcpy lkb not released");
5402 struct dlm_ls *ls = r->res_ls;
5404 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5405 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5406 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5409 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5420 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5433 log_error(ls, "purged dead lkb not released");
5444 void dlm_recover_purge(struct dlm_ls *ls)
5455 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5463 down_write(&ls->ls_root_sem);
5464 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5468 purge_dead_list(ls, r, &r->res_grantqueue,
5470 purge_dead_list(ls, r, &r->res_convertqueue,
5472 purge_dead_list(ls, r, &r->res_waitqueue,
5479 up_write(&ls->ls_root_sem);
5482 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5486 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5491 spin_lock(&ls->ls_rsbtbl[bucket].lock);
5492 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5502 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5505 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5526 void dlm_recover_grant(struct dlm_ls *ls)
5535 r = find_grant_rsb(ls, bucket);
5537 if (bucket == ls->ls_rsbtbl_size - 1)
5556 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5590 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5612 if (lvblen > ls->ls_lvblen)
5614 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5641 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5665 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5672 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5673 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5685 error = create_lkb(ls, &lkb);
5689 error = receive_rcom_lock_args(ls, lkb, r, rc);
5691 __put_lkb(ls, lkb);
5698 ls->ls_recover_locks_in++;
5708 lkb->lkb_recover_seq = ls->ls_recover_seq;
5715 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5722 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5734 error = find_lkb(ls, lkid, &lkb);
5736 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5746 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5761 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5771 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5786 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5794 dlm_lock_recovery(ls);
5796 error = create_lkb(ls, &lkb);
5806 __put_lkb(ls, lkb);
5817 __put_lkb(ls, lkb);
5825 error = request_lock(ls, lkb, name, namelen, &args);
5837 __put_lkb(ls, lkb);
5847 dlm_unlock_recovery(ls);
5851 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5860 dlm_lock_recovery(ls);
5862 error = find_lkb(ls, lkid, &lkb);
5893 error = convert_lock(ls, lkb, &args);
5900 dlm_unlock_recovery(ls);
5911 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5920 mutex_lock(&ls->ls_orphans_mutex);
5921 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5937 mutex_unlock(&ls->ls_orphans_mutex);
5976 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5984 dlm_lock_recovery(ls);
5986 error = find_lkb(ls, lkid, &lkb);
6002 error = unlock_lock(ls, lkb, &args);
6020 dlm_unlock_recovery(ls);
6025 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6033 dlm_lock_recovery(ls);
6035 error = find_lkb(ls, lkid, &lkb);
6048 error = cancel_lock(ls, lkb, &args);
6058 dlm_unlock_recovery(ls);
6063 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6071 dlm_lock_recovery(ls);
6073 error = find_lkb(ls, lkid, &lkb);
6107 dlm_unlock_recovery(ls);
6114 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6120 mutex_lock(&ls->ls_orphans_mutex);
6121 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6122 mutex_unlock(&ls->ls_orphans_mutex);
6126 error = cancel_lock(ls, lkb, &args);
6137 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6145 error = unlock_lock(ls, lkb, &args);
6155 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6160 mutex_lock(&ls->ls_clear_proc_locks);
6172 mutex_unlock(&ls->ls_clear_proc_locks);
6186 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6190 dlm_lock_recovery(ls);
6193 lkb = del_proc_lock(ls, proc);
6198 orphan_proc_lock(ls, lkb);
6200 unlock_proc_lock(ls, lkb);
6209 mutex_lock(&ls->ls_clear_proc_locks);
6225 mutex_unlock(&ls->ls_clear_proc_locks);
6226 dlm_unlock_recovery(ls);
6229 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6247 unlock_proc_lock(ls, lkb);
6271 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6275 mutex_lock(&ls->ls_orphans_mutex);
6276 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6279 unlock_proc_lock(ls, lkb);
6283 mutex_unlock(&ls->ls_orphans_mutex);
6286 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6292 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6302 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6308 error = send_purge(ls, nodeid, pid);
6310 dlm_lock_recovery(ls);
6312 purge_proc_locks(ls, proc);
6314 do_purge(ls, nodeid, pid);
6315 dlm_unlock_recovery(ls);