Lines Matching refs:ls
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
204 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 down_read(&ls->ls_in_recovery);
209 void dlm_unlock_recovery(struct dlm_ls *ls)
211 up_read(&ls->ls_in_recovery);
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 return down_read_trylock(&ls->ls_in_recovery);
341 struct dlm_ls *ls = r->res_ls;
346 &ls->ls_rsbtbl[bucket].lock);
348 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
356 static int pre_rsb_struct(struct dlm_ls *ls)
361 spin_lock(&ls->ls_new_rsb_spin);
362 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
363 spin_unlock(&ls->ls_new_rsb_spin);
366 spin_unlock(&ls->ls_new_rsb_spin);
368 r1 = dlm_allocate_rsb(ls);
369 r2 = dlm_allocate_rsb(ls);
371 spin_lock(&ls->ls_new_rsb_spin);
373 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
374 ls->ls_new_rsb_count++;
377 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
378 ls->ls_new_rsb_count++;
380 count = ls->ls_new_rsb_count;
381 spin_unlock(&ls->ls_new_rsb_spin);
388 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
392 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
398 spin_lock(&ls->ls_new_rsb_spin);
399 if (list_empty(&ls->ls_new_rsb)) {
400 count = ls->ls_new_rsb_count;
401 spin_unlock(&ls->ls_new_rsb_spin);
402 log_debug(ls, "find_rsb retry %d %d %s",
408 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
412 ls->ls_new_rsb_count--;
413 spin_unlock(&ls->ls_new_rsb_spin);
415 r->res_ls = ls;
538 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
582 error = pre_rsb_struct(ls);
587 spin_lock(&ls->ls_rsbtbl[b].lock);
589 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
602 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
616 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625 log_error(ls, "find_rsb toss from_dir %d master %d",
642 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
643 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
655 error = get_rsb_struct(ls, name, len, &r);
657 spin_unlock(&ls->ls_rsbtbl[b].lock);
670 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688 log_debug(ls, "find_rsb new from_other %d dir %d %s",
704 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
706 spin_unlock(&ls->ls_rsbtbl[b].lock);
716 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
727 error = pre_rsb_struct(ls);
731 spin_lock(&ls->ls_rsbtbl[b].lock);
733 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
746 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
759 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
770 log_error(ls, "find_rsb toss our %d master %d dir %d",
777 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
778 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
787 error = get_rsb_struct(ls, name, len, &r);
789 spin_unlock(&ls->ls_rsbtbl[b].lock);
802 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
804 spin_unlock(&ls->ls_rsbtbl[b].lock);
810 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
821 b = hash & (ls->ls_rsbtbl_size - 1);
823 dir_nodeid = dlm_hash2nodeid(ls, hash);
825 if (dlm_no_directory(ls))
826 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
829 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
836 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
839 if (dlm_no_directory(ls)) {
840 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
853 log_debug(ls, "validate master from_other %d master %d "
864 log_error(ls, "validate master from_dir %d master %d "
876 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
885 log_error(ls, "%s res_dir %d our %d %s", __func__,
890 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
903 log_error(ls, "%s fix_master on toss", __func__);
914 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
919 log_error(ls, "from_master %d our_master", from_nodeid);
934 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
947 log_limit(ls, "%s from master %d flags %x first %x %s",
987 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
999 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1005 b = hash & (ls->ls_rsbtbl_size - 1);
1007 dir_nodeid = dlm_hash2nodeid(ls, hash);
1009 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1011 ls->ls_num_nodes);
1017 error = pre_rsb_struct(ls);
1021 spin_lock(&ls->ls_rsbtbl[b].lock);
1022 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1029 spin_unlock(&ls->ls_rsbtbl[b].lock);
1032 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1042 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1050 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1055 spin_unlock(&ls->ls_rsbtbl[b].lock);
1060 error = get_rsb_struct(ls, name, len, &r);
1062 spin_unlock(&ls->ls_rsbtbl[b].lock);
1076 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1080 spin_unlock(&ls->ls_rsbtbl[b].lock);
1088 spin_unlock(&ls->ls_rsbtbl[b].lock);
1092 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1098 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1099 spin_lock(&ls->ls_rsbtbl[i].lock);
1100 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1105 spin_unlock(&ls->ls_rsbtbl[i].lock);
1109 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1116 b = hash & (ls->ls_rsbtbl_size - 1);
1118 spin_lock(&ls->ls_rsbtbl[b].lock);
1119 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1123 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1129 spin_unlock(&ls->ls_rsbtbl[b].lock);
1135 struct dlm_ls *ls = r->res_ls;
1139 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1140 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1142 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
1190 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1196 lkb = dlm_allocate_lkb(ls);
1212 spin_lock(&ls->ls_lkbidr_spin);
1213 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1216 spin_unlock(&ls->ls_lkbidr_spin);
1220 log_error(ls, "create_lkb idr error %d", rv);
1229 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1231 return _create_lkb(ls, lkb_ret, 1, 0);
1234 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1238 spin_lock(&ls->ls_lkbidr_spin);
1239 lkb = idr_find(&ls->ls_lkbidr, lkid);
1242 spin_unlock(&ls->ls_lkbidr_spin);
1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1267 &ls->ls_lkbidr_spin);
1269 idr_remove(&ls->ls_lkbidr, lkid);
1270 spin_unlock(&ls->ls_lkbidr_spin);
1285 struct dlm_ls *ls;
1290 ls = lkb->lkb_resource->res_ls;
1291 return __put_lkb(ls, lkb);
1408 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1412 mutex_lock(&ls->ls_waiters_mutex);
1435 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1446 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1449 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1452 mutex_unlock(&ls->ls_waiters_mutex);
1464 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1469 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1476 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1486 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1502 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1518 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1530 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1548 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1551 mutex_lock(&ls->ls_waiters_mutex);
1553 mutex_unlock(&ls->ls_waiters_mutex);
1563 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1567 mutex_lock(&ls->ls_waiters_mutex);
1570 mutex_unlock(&ls->ls_waiters_mutex);
1574 static void shrink_bucket(struct dlm_ls *ls, int b)
1584 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1586 spin_lock(&ls->ls_rsbtbl[b].lock);
1588 if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
1589 spin_unlock(&ls->ls_rsbtbl[b].lock);
1593 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1602 if (!dlm_no_directory(ls) &&
1615 if (!dlm_no_directory(ls) &&
1623 ls->ls_remove_lens[remote_count] = r->res_length;
1624 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1634 log_error(ls, "tossed rsb in use %s", r->res_name);
1638 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1643 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1645 clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1646 spin_unlock(&ls->ls_rsbtbl[b].lock);
1658 name = ls->ls_remove_names[i];
1659 len = ls->ls_remove_lens[i];
1661 spin_lock(&ls->ls_rsbtbl[b].lock);
1662 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1664 spin_unlock(&ls->ls_rsbtbl[b].lock);
1665 log_debug(ls, "remove_name not toss %s", name);
1670 spin_unlock(&ls->ls_rsbtbl[b].lock);
1671 log_debug(ls, "remove_name master %d dir %d our %d %s",
1679 spin_unlock(&ls->ls_rsbtbl[b].lock);
1680 log_error(ls, "remove_name dir %d master %d our %d %s",
1688 spin_unlock(&ls->ls_rsbtbl[b].lock);
1689 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1695 spin_unlock(&ls->ls_rsbtbl[b].lock);
1696 log_error(ls, "remove_name in use %s", name);
1700 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1702 spin_unlock(&ls->ls_rsbtbl[b].lock);
1708 void dlm_scan_rsbs(struct dlm_ls *ls)
1712 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1713 shrink_bucket(ls, i);
1714 if (dlm_locking_stopped(ls))
2659 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2701 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2707 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2726 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2739 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2752 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2763 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2846 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2852 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3107 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3114 error = validate_lock_args(ls, lkb, args);
3118 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3134 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3145 error = validate_lock_args(ls, lkb, args);
3156 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3178 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3215 struct dlm_ls *ls;
3220 ls = dlm_find_lockspace_local(lockspace);
3221 if (!ls)
3224 dlm_lock_recovery(ls);
3227 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3229 error = create_lkb(ls, &lkb);
3234 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3242 error = convert_lock(ls, lkb, &args);
3244 error = request_lock(ls, lkb, name, namelen, &args);
3249 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3252 __put_lkb(ls, lkb);
3256 dlm_unlock_recovery(ls);
3257 dlm_put_lockspace(ls);
3267 struct dlm_ls *ls;
3272 ls = dlm_find_lockspace_local(lockspace);
3273 if (!ls)
3276 dlm_lock_recovery(ls);
3278 error = find_lkb(ls, lkid, &lkb);
3282 trace_dlm_unlock_start(ls, lkb, flags);
3289 error = cancel_lock(ls, lkb, &args);
3291 error = unlock_lock(ls, lkb, &args);
3298 trace_dlm_unlock_end(ls, lkb, flags, error);
3302 dlm_unlock_recovery(ls);
3303 dlm_put_lockspace(ls);
3329 static int _create_message(struct dlm_ls *ls, int mb_len,
3350 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3644 static int send_lookup_reply(struct dlm_ls *ls,
3648 struct dlm_rsb *r = &ls->ls_local_rsb;
3695 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3702 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3706 if (len > ls->ls_lvblen)
3707 len = ls->ls_lvblen;
3723 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3737 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3745 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3751 if (receive_lvb(ls, lkb, ms))
3760 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3763 if (receive_lvb(ls, lkb, ms))
3771 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3773 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3833 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3842 error = create_lkb(ls, &lkb);
3848 error = receive_request_args(ls, lkb, ms);
3850 __put_lkb(ls, lkb);
3862 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3865 __put_lkb(ls, lkb);
3872 error = validate_master_nodeid(ls, r, from_nodeid);
3876 __put_lkb(ls, lkb);
3903 log_limit(ls, "receive_request %x from %d %d",
3907 setup_local_lkb(ls, ms);
3908 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3912 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
3918 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3923 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3944 error = receive_convert_args(ls, lkb, ms);
3963 setup_local_lkb(ls, ms);
3964 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3968 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
3974 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3979 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3999 error = receive_unlock_args(ls, lkb, ms);
4015 setup_local_lkb(ls, ms);
4016 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4020 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4026 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4051 setup_local_lkb(ls, ms);
4052 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4056 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4062 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4087 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4093 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4115 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4124 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4129 receive_request(ls, ms);
4132 send_lookup_reply(ls, ms, ret_nodeid, error);
4135 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4147 log_error(ls, "receive_remove from %d bad len %d",
4152 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4154 log_error(ls, "receive_remove from %d bad nodeid %d",
4172 b = hash & (ls->ls_rsbtbl_size - 1);
4174 spin_lock(&ls->ls_rsbtbl[b].lock);
4176 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4179 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4182 log_error(ls, "receive_remove from %d not found %s",
4184 spin_unlock(&ls->ls_rsbtbl[b].lock);
4189 log_error(ls, "receive_remove keep from %d master %d",
4192 spin_unlock(&ls->ls_rsbtbl[b].lock);
4196 log_debug(ls, "receive_remove from %d master %d first %x %s",
4199 spin_unlock(&ls->ls_rsbtbl[b].lock);
4204 log_error(ls, "receive_remove toss from %d master %d",
4207 spin_unlock(&ls->ls_rsbtbl[b].lock);
4212 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4213 spin_unlock(&ls->ls_rsbtbl[b].lock);
4216 log_error(ls, "receive_remove from %d rsb ref error",
4219 spin_unlock(&ls->ls_rsbtbl[b].lock);
4223 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4225 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4228 static int receive_request_reply(struct dlm_ls *ls,
4236 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4251 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4296 log_limit(ls, "receive_request_reply %x from %d %d "
4323 log_error(ls, "receive_request_reply %x error %d",
4329 log_debug(ls, "receive_request_reply %x result %d unlock",
4336 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4418 static int receive_convert_reply(struct dlm_ls *ls,
4424 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4470 static int receive_unlock_reply(struct dlm_ls *ls,
4476 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4523 static int receive_cancel_reply(struct dlm_ls *ls,
4529 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4538 static void receive_lookup_reply(struct dlm_ls *ls,
4546 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4548 log_error(ls, "%s no lkid %x", __func__,
4574 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4588 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4600 log_debug(ls, "receive_lookup_reply %x unlock %x",
4618 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4623 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4624 log_limit(ls, "receive %d from non-member %d %x %x %d",
4637 error = receive_request(ls, ms);
4641 error = receive_convert(ls, ms);
4645 error = receive_unlock(ls, ms);
4650 error = receive_cancel(ls, ms);
4656 error = receive_request_reply(ls, ms);
4660 error = receive_convert_reply(ls, ms);
4664 error = receive_unlock_reply(ls, ms);
4668 error = receive_cancel_reply(ls, ms);
4675 error = receive_grant(ls, ms);
4680 error = receive_bast(ls, ms);
4686 receive_lookup(ls, ms);
4690 receive_remove(ls, ms);
4696 receive_lookup_reply(ls, ms);
4702 receive_purge(ls, ms);
4706 log_error(ls, "unknown message type %d",
4722 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4727 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4733 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4737 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4754 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4757 if (dlm_locking_stopped(ls)) {
4761 if (WARN_ON_ONCE(!ls->ls_generation)) {
4762 log_limit(ls, "receive %d from %d ignore old gen",
4767 dlm_add_requestqueue(ls, nodeid, ms);
4769 dlm_wait_requestqueue(ls);
4770 _receive_message(ls, ms, 0);
4777 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4780 _receive_message(ls, ms, saved_seq);
4791 struct dlm_ls *ls;
4813 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4814 if (!ls) {
4828 be inactive (in this ls) before transitioning to recovery mode */
4830 down_read(&ls->ls_recv_active);
4832 dlm_receive_message(ls, &p->message, nodeid);
4834 dlm_receive_rcom(ls, &p->rcom, nodeid);
4836 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4838 up_read(&ls->ls_recv_active);
4840 dlm_put_lockspace(ls);
4843 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4870 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4873 if (dlm_no_directory(ls))
4876 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4888 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4899 mutex_lock(&ls->ls_waiters_mutex);
4901 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4909 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4928 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4952 log_debug(ls, "rwpre overlap %x %x %d %d %d",
4964 recover_convert_waiter(ls, lkb, ms_local);
4988 log_error(ls, "invalid lkb wait_type %d %d",
4993 mutex_unlock(&ls->ls_waiters_mutex);
4997 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5001 mutex_lock(&ls->ls_waiters_mutex);
5002 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5009 mutex_unlock(&ls->ls_waiters_mutex);
5030 int dlm_recover_waiters_post(struct dlm_ls *ls)
5037 if (dlm_locking_stopped(ls)) {
5038 log_debug(ls, "recover_waiters_post aborted");
5043 lkb = find_resend_waiter(ls);
5058 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5076 mutex_lock(&ls->ls_waiters_mutex);
5078 mutex_unlock(&ls->ls_waiters_mutex);
5117 log_error(ls, "waiter %x msg %d r_nodeid %d "
5130 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5142 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5149 log_error(ls, "purged mstcpy lkb not released");
5155 struct dlm_ls *ls = r->res_ls;
5157 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5158 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5159 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5162 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5173 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5186 log_error(ls, "purged dead lkb not released");
5197 void dlm_recover_purge(struct dlm_ls *ls)
5208 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5216 down_write(&ls->ls_root_sem);
5217 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5221 purge_dead_list(ls, r, &r->res_grantqueue,
5223 purge_dead_list(ls, r, &r->res_convertqueue,
5225 purge_dead_list(ls, r, &r->res_waitqueue,
5232 up_write(&ls->ls_root_sem);
5235 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5239 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5244 spin_lock(&ls->ls_rsbtbl[bucket].lock);
5245 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5255 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5258 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5279 void dlm_recover_grant(struct dlm_ls *ls)
5288 r = find_grant_rsb(ls, bucket);
5290 if (bucket == ls->ls_rsbtbl_size - 1)
5309 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5343 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5365 if (lvblen > ls->ls_lvblen)
5367 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5394 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5422 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5429 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5430 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5442 error = create_lkb(ls, &lkb);
5446 error = receive_rcom_lock_args(ls, lkb, r, rc);
5448 __put_lkb(ls, lkb);
5454 ls->ls_recover_locks_in++;
5464 lkb->lkb_recover_seq = ls->ls_recover_seq;
5471 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5478 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5491 error = find_lkb(ls, lkid, &lkb);
5493 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5504 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5520 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5531 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5547 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5555 dlm_lock_recovery(ls);
5557 error = create_lkb(ls, &lkb);
5563 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5586 error = request_lock(ls, lkb, name, namelen, &args);
5608 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5610 __put_lkb(ls, lkb);
5612 dlm_unlock_recovery(ls);
5616 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5624 dlm_lock_recovery(ls);
5626 error = find_lkb(ls, lkid, &lkb);
5630 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5659 error = convert_lock(ls, lkb, &args);
5664 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5667 dlm_unlock_recovery(ls);
5678 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5687 mutex_lock(&ls->ls_orphans_mutex);
5688 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5704 mutex_unlock(&ls->ls_orphans_mutex);
5743 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5751 dlm_lock_recovery(ls);
5753 error = find_lkb(ls, lkid, &lkb);
5757 trace_dlm_unlock_start(ls, lkb, flags);
5771 error = unlock_lock(ls, lkb, &args);
5787 trace_dlm_unlock_end(ls, lkb, flags, error);
5790 dlm_unlock_recovery(ls);
5795 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5803 dlm_lock_recovery(ls);
5805 error = find_lkb(ls, lkid, &lkb);
5809 trace_dlm_unlock_start(ls, lkb, flags);
5820 error = cancel_lock(ls, lkb, &args);
5828 trace_dlm_unlock_end(ls, lkb, flags, error);
5831 dlm_unlock_recovery(ls);
5836 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5844 dlm_lock_recovery(ls);
5846 error = find_lkb(ls, lkid, &lkb);
5850 trace_dlm_unlock_start(ls, lkb, flags);
5880 trace_dlm_unlock_end(ls, lkb, flags, error);
5883 dlm_unlock_recovery(ls);
5890 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5896 mutex_lock(&ls->ls_orphans_mutex);
5897 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5898 mutex_unlock(&ls->ls_orphans_mutex);
5902 error = cancel_lock(ls, lkb, &args);
5913 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5921 error = unlock_lock(ls, lkb, &args);
5931 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5936 spin_lock(&ls->ls_clear_proc_locks);
5948 spin_unlock(&ls->ls_clear_proc_locks);
5962 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5966 dlm_lock_recovery(ls);
5969 lkb = del_proc_lock(ls, proc);
5973 orphan_proc_lock(ls, lkb);
5975 unlock_proc_lock(ls, lkb);
5984 spin_lock(&ls->ls_clear_proc_locks);
5999 spin_unlock(&ls->ls_clear_proc_locks);
6000 dlm_unlock_recovery(ls);
6003 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6021 unlock_proc_lock(ls, lkb);
6044 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6048 mutex_lock(&ls->ls_orphans_mutex);
6049 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6052 unlock_proc_lock(ls, lkb);
6056 mutex_unlock(&ls->ls_orphans_mutex);
6059 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6065 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6075 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6081 error = send_purge(ls, nodeid, pid);
6083 dlm_lock_recovery(ls);
6085 purge_proc_locks(ls, proc);
6087 do_purge(ls, nodeid, pid);
6088 dlm_unlock_recovery(ls);
6094 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6110 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6123 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6126 __put_lkb(ls, lkb);
6139 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6145 error = find_lkb(ls, lkb_id, &lkb);