Lines Matching refs:dlm

40 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
44 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
50 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
55 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
60 if (dlm != mle->dlm)
77 struct dlm_ctxt *dlm,
83 static int dlm_find_mle(struct dlm_ctxt *dlm,
91 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
95 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
99 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
106 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
108 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
110 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
113 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
152 * dlm's established heartbeat callbacks. the mle is attached
153 * when it is created, and since the dlm->spinlock is held at
156 * dlm->mle_hb_events list as soon as heartbeat events are no
163 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
166 assert_spin_locked(&dlm->spinlock);
168 list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
172 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
180 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
183 spin_lock(&dlm->spinlock);
184 __dlm_mle_detach_hb_events(dlm, mle);
185 spin_unlock(&dlm->spinlock);
190 struct dlm_ctxt *dlm;
191 dlm = mle->dlm;
193 assert_spin_locked(&dlm->spinlock);
194 assert_spin_locked(&dlm->master_lock);
201 struct dlm_ctxt *dlm;
202 dlm = mle->dlm;
204 spin_lock(&dlm->spinlock);
205 spin_lock(&dlm->master_lock);
208 spin_unlock(&dlm->master_lock);
209 spin_unlock(&dlm->spinlock);
216 struct dlm_ctxt *dlm;
217 dlm = mle->dlm;
219 assert_spin_locked(&dlm->spinlock);
220 assert_spin_locked(&dlm->master_lock);
235 struct dlm_ctxt *dlm;
236 dlm = mle->dlm;
238 spin_lock(&dlm->spinlock);
239 spin_lock(&dlm->master_lock);
241 spin_unlock(&dlm->master_lock);
242 spin_unlock(&dlm->spinlock);
252 struct dlm_ctxt *dlm,
257 assert_spin_locked(&dlm->spinlock);
259 mle->dlm = dlm;
291 atomic_inc(&dlm->mle_tot_count[mle->type]);
292 atomic_inc(&dlm->mle_cur_count[mle->type]);
295 memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
296 memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
297 clear_bit(dlm->node_num, mle->vote_map);
298 clear_bit(dlm->node_num, mle->node_map);
301 __dlm_mle_attach_hb_events(dlm, mle);
304 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
306 assert_spin_locked(&dlm->spinlock);
307 assert_spin_locked(&dlm->master_lock);
313 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
317 assert_spin_locked(&dlm->master_lock);
319 bucket = dlm_master_hash(dlm, mle->mnamehash);
324 static int dlm_find_mle(struct dlm_ctxt *dlm,
332 assert_spin_locked(&dlm->master_lock);
335 bucket = dlm_master_hash(dlm, hash);
337 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
346 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
350 assert_spin_locked(&dlm->spinlock);
352 list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
354 dlm_mle_node_up(dlm, mle, NULL, idx);
356 dlm_mle_node_down(dlm, mle, NULL, idx);
360 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
374 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
408 struct dlm_ctxt *dlm;
411 dlm = mle->dlm;
413 assert_spin_locked(&dlm->spinlock);
414 assert_spin_locked(&dlm->master_lock);
420 __dlm_unlink_mle(dlm, mle);
423 __dlm_mle_detach_hb_events(dlm, mle);
425 atomic_dec(&dlm->mle_cur_count[mle->type]);
469 struct dlm_ctxt *dlm;
472 dlm = res->dlm;
481 atomic_dec(&dlm->res_cur_count);
525 static void dlm_init_lockres(struct dlm_ctxt *dlm,
556 res->dlm = dlm;
560 atomic_inc(&dlm->res_tot_count);
561 atomic_inc(&dlm->res_cur_count);
565 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
572 spin_lock(&dlm->track_lock);
573 list_add_tail(&res->tracking, &dlm->tracking_list);
574 spin_unlock(&dlm->track_lock);
580 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
594 dlm_init_lockres(dlm, res, name, namelen);
603 void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
614 void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
625 static void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
630 mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
635 void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
639 __dlm_lockres_grab_inflight_ref(dlm, res);
642 void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
651 mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
658 void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
664 dlm->name, res->lockname.len, res->lockname.name,
668 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
675 dlm->name, res->lockname.len, res->lockname.name,
679 static void dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
683 __dlm_lockres_drop_inflight_worker(dlm, res);
695 * also, do a lookup in the dlm->master_list to see
703 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
725 spin_lock(&dlm->spinlock);
726 tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
728 spin_unlock(&dlm->spinlock);
732 * Right after dlm spinlock was released, dlm_thread could have
755 BUG_ON(tmpres->owner == dlm->node_num);
765 dlm_lockres_grab_inflight_ref(dlm, tmpres);
769 spin_lock(&dlm->track_lock);
777 spin_unlock(&dlm->track_lock);
785 spin_unlock(&dlm->spinlock);
791 res = dlm_new_lockres(dlm, lockid, namelen);
803 dlm_change_lockres_owner(dlm, res, dlm->node_num);
804 __dlm_insert_lockres(dlm, res);
805 dlm_lockres_grab_inflight_ref(dlm, res);
807 spin_unlock(&dlm->spinlock);
813 spin_lock(&dlm->master_lock);
816 blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
833 BUG_ON(mig && mle->master == dlm->node_num);
837 dlm->name, namelen, lockid,
839 spin_unlock(&dlm->master_lock);
840 spin_unlock(&dlm->spinlock);
844 dlm_mle_detach_hb_events(dlm, mle);
858 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
859 set_bit(dlm->node_num, mle->maybe_map);
860 __dlm_insert_mle(dlm, mle);
862 /* still holding the dlm spinlock, check the recovery map
866 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
870 dlm->name, namelen, (char *)lockid, bit);
881 __dlm_insert_lockres(dlm, res);
884 __dlm_lockres_grab_inflight_ref(dlm, res);
891 spin_unlock(&dlm->master_lock);
892 spin_unlock(&dlm->spinlock);
897 * dlm spinlock would be detectable be a change on the mle,
901 "master $RECOVERY lock now\n", dlm->name);
902 if (!dlm_pre_master_reco_lockres(dlm, res))
906 "change\n", dlm->name);
912 dlm_kick_recovery_thread(dlm);
914 dlm_wait_for_recovery(dlm);
916 spin_lock(&dlm->spinlock);
917 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
921 dlm->name, namelen, (char *)lockid, bit);
925 spin_unlock(&dlm->spinlock);
928 dlm_wait_for_node_recovery(dlm, bit, 10000);
950 "master is %u, keep going\n", dlm->name, namelen,
957 ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
961 "request now, blocked=%d\n", dlm->name, res->lockname.len,
966 dlm->name, res->lockname.len,
975 mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
981 dlm_mle_detach_hb_events(dlm, mle);
1003 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1020 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1025 if (res->owner != dlm->node_num) {
1050 dlm->name, res->lockname.len, res->lockname.name);
1051 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1055 dlm->name, res->lockname.len, res->lockname.name,
1065 "rechecking now\n", dlm->name, res->lockname.len,
1071 "for %s:%.*s\n", dlm->name, res->lockname.len,
1085 if (dlm->node_num <= bit) {
1089 mle->master = dlm->node_num;
1110 mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1121 m = dlm->node_num;
1124 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1142 dlm_change_lockres_owner(dlm, res, m);
1208 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1255 "now\n", dlm->name,
1271 dlm->name,
1289 set_bit(dlm->node_num, mle->maybe_map);
1311 struct dlm_ctxt *dlm = mle->dlm;
1316 request.node_idx = dlm->node_num;
1324 ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1360 "reference\n", dlm->name, res->lockname.len,
1395 * dlm->spinlock
1398 * dlm->master_list
1406 struct dlm_ctxt *dlm = data;
1417 if (!dlm_grab(dlm))
1420 if (!dlm_domain_fully_joined(dlm)) {
1435 spin_lock(&dlm->spinlock);
1436 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1438 spin_unlock(&dlm->spinlock);
1444 * Right after dlm spinlock was released, dlm_thread could have
1465 if (res->owner == dlm->node_num) {
1466 dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
1499 spin_lock(&dlm->master_lock);
1500 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1514 if (tmpmle->master == dlm->node_num) {
1525 if (tmpmle->master == dlm->node_num) {
1531 dlm_lockres_set_refmap_bit(dlm, res,
1544 spin_unlock(&dlm->master_lock);
1560 spin_lock(&dlm->master_lock);
1561 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1566 spin_unlock(&dlm->master_lock);
1567 spin_unlock(&dlm->spinlock);
1580 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1582 __dlm_insert_mle(dlm, mle);
1586 if (tmpmle->master == dlm->node_num) {
1602 spin_unlock(&dlm->master_lock);
1603 spin_unlock(&dlm->spinlock);
1618 dlm->node_num, res->lockname.len, res->lockname.name);
1620 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1629 __dlm_lockres_grab_inflight_worker(dlm, res);
1638 dlm_put(dlm);
1652 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1682 assert.node_idx = dlm->node_num;
1687 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1692 DLM_ASSERT_MASTER_MSG, dlm->key, to);
1706 spin_lock(&dlm->spinlock);
1707 spin_lock(&dlm->master_lock);
1708 if (dlm_find_mle(dlm, &mle, (char *)lockname,
1713 spin_unlock(&dlm->master_lock);
1714 spin_unlock(&dlm->spinlock);
1736 dlm_lockres_set_refmap_bit(dlm, res, to);
1754 * dlm->spinlock
1757 * dlm->master_list
1764 struct dlm_ctxt *dlm = data;
1774 if (!dlm_grab(dlm))
1787 spin_lock(&dlm->spinlock);
1793 spin_lock(&dlm->master_lock);
1794 if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1827 dlm->name, namelen, name,
1832 dlm->name, namelen, name,
1835 spin_unlock(&dlm->master_lock);
1836 spin_unlock(&dlm->spinlock);
1841 spin_unlock(&dlm->master_lock);
1845 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1918 if (nn != dlm->node_num && nn != assert->node_idx) {
1936 dlm->node_num, mle->new_master);
1939 dlm_change_lockres_owner(dlm, res, mle->new_master);
1942 dlm_change_lockres_owner(dlm, res, mle->master);
1953 spin_lock(&dlm->master_lock);
1970 "inuse=%d\n", dlm->name, namelen, name,
1974 __dlm_unlink_mle(dlm, mle);
1975 __dlm_mle_detach_hb_events(dlm, mle);
1984 spin_unlock(&dlm->master_lock);
1992 spin_unlock(&dlm->spinlock);
2002 dlm_put(dlm);
2010 assert->node_idx, dlm->name, namelen, name);
2017 dlm->name, namelen, name, assert->node_idx);
2027 spin_lock(&dlm->master_lock);
2030 spin_unlock(&dlm->master_lock);
2031 spin_unlock(&dlm->spinlock);
2033 dlm_put(dlm);
2051 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2062 dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2073 spin_lock(&dlm->work_lock);
2074 list_add_tail(&item->list, &dlm->work_list);
2075 spin_unlock(&dlm->work_lock);
2077 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2083 struct dlm_ctxt *dlm = data;
2092 dlm = item->dlm;
2098 spin_lock(&dlm->spinlock);
2099 memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2100 spin_unlock(&dlm->spinlock);
2102 clear_bit(dlm->node_num, nodemap);
2108 bit = dlm->node_num;
2138 res->lockname.len, res->lockname.name, dlm->node_num);
2139 ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2147 dlm_lockres_release_ast(dlm, res);
2150 dlm_lockres_drop_inflight_worker(dlm, res);
2167 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2175 spin_lock(&dlm->spinlock);
2176 dlm_node_iter_init(dlm->domain_map, &iter);
2177 spin_unlock(&dlm->spinlock);
2181 if (nodenum == dlm->node_num)
2183 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2195 spin_lock(&dlm->spinlock);
2196 if (test_bit(master, dlm->recovery_map)) {
2200 "lock. must wait.\n", dlm->name,
2204 spin_unlock(&dlm->spinlock);
2205 mlog(0, "%s: reco lock master is %u\n", dlm->name,
2217 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2229 deref.node_idx = dlm->node_num;
2233 ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2237 dlm->name, namelen, lockname, ret, res->owner);
2241 dlm->name, namelen, lockname, res->owner, r);
2254 struct dlm_ctxt *dlm = data;
2266 if (!dlm_grab(dlm))
2284 spin_lock(&dlm->spinlock);
2285 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2287 spin_unlock(&dlm->spinlock);
2289 dlm->name, namelen, name);
2292 spin_unlock(&dlm->spinlock);
2300 dlm_lockres_clear_refmap_bit(dlm, res, node);
2308 dlm_lockres_calc_usage(dlm, res);
2311 "but it is already dropped!\n", dlm->name,
2326 dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2330 spin_lock(&dlm->work_lock);
2331 list_add_tail(&item->list, &dlm->work_list);
2332 spin_unlock(&dlm->work_lock);
2334 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2340 dlm_put(dlm);
2348 struct dlm_ctxt *dlm = data;
2358 if (!dlm_grab(dlm))
2376 spin_lock(&dlm->spinlock);
2377 res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2379 spin_unlock(&dlm->spinlock);
2381 dlm->name, namelen, name);
2388 spin_unlock(&dlm->spinlock);
2390 "but it is already derefed!\n", dlm->name,
2396 __dlm_do_purge_lockres(dlm, res);
2400 spin_unlock(&dlm->spinlock);
2406 dlm_put(dlm);
2410 static void dlm_drop_lockres_ref_done(struct dlm_ctxt *dlm,
2423 deref.node_idx = dlm->node_num;
2427 ret = o2net_send_message(DLM_DEREF_LOCKRES_DONE, dlm->key,
2431 " to node %u\n", dlm->name, namelen,
2436 dlm->name, namelen, lockname, node, r);
2443 struct dlm_ctxt *dlm;
2448 dlm = item->dlm;
2456 dlm_lockres_clear_refmap_bit(dlm, res, node);
2461 dlm_drop_lockres_ref_done(dlm, res, node);
2465 dlm->name, res->lockname.len, res->lockname.name, node);
2466 dlm_lockres_calc_usage(dlm, res);
2469 "but it is already dropped!\n", dlm->name,
2484 static int dlm_is_lockres_migratable(struct dlm_ctxt *dlm,
2504 if (res->owner != dlm->node_num)
2510 if (lock->ml.node != dlm->node_num) {
2516 "%s list\n", dlm->name, res->lockname.len,
2531 mlog(0, "%s: res %.*s, Migratable\n", dlm->name, res->lockname.len,
2542 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2554 if (!dlm_grab(dlm))
2560 mlog(0, "%s: Migrating %.*s to node %u\n", dlm->name, namelen, name,
2582 spin_lock(&dlm->spinlock);
2583 spin_lock(&dlm->master_lock);
2584 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2585 namelen, target, dlm->node_num);
2593 spin_unlock(&dlm->master_lock);
2594 spin_unlock(&dlm->spinlock);
2606 if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2620 dlm_mle_detach_hb_events(dlm, oldmle);
2626 dlm_mle_detach_hb_events(dlm, mle);
2644 flush_workqueue(dlm->dlm_worker);
2650 ret = dlm_send_one_lockres(dlm, res, mres, target,
2657 dlm_mle_detach_hb_events(dlm, mle);
2665 dlm_wait_for_node_death(dlm, target,
2693 dlm->name, res->lockname.len, res->lockname.name);
2696 if (dlm_is_node_dead(dlm, target)) {
2699 dlm->name, res->lockname.len,
2703 dlm_mle_detach_hb_events(dlm, mle);
2714 dlm->name, res->lockname.len, res->lockname.name);
2719 dlm_set_lockres_owner(dlm, res, target);
2721 dlm_remove_nonlocal_locks(dlm, res);
2726 dlm_mle_detach_hb_events(dlm, mle);
2730 dlm_lockres_calc_usage(dlm, res);
2735 dlm_kick_thread(dlm, res);
2745 dlm_put(dlm);
2747 mlog(0, "%s: Migrating %.*s to %u, returns %d\n", dlm->name, namelen,
2757 * Called with the dlm spinlock held, may drop it to do migration, but
2760 * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped
2762 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2763 __must_hold(&dlm->spinlock)
2769 assert_spin_locked(&dlm->spinlock);
2772 if (dlm_is_lockres_migratable(dlm, res))
2773 target = dlm_pick_migration_target(dlm, res);
2780 spin_unlock(&dlm->spinlock);
2782 ret = dlm_migrate_lockres(dlm, res, target);
2785 dlm->name, res->lockname.len, res->lockname.name,
2787 spin_lock(&dlm->spinlock);
2792 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2795 spin_lock(&dlm->ast_lock);
2799 spin_unlock(&dlm->ast_lock);
2803 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2814 spin_lock(&dlm->spinlock);
2815 if (!test_bit(mig_target, dlm->domain_map))
2817 spin_unlock(&dlm->spinlock);
2821 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2832 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2839 res->lockname.len, res->lockname.name, dlm->node_num,
2852 dlm_kick_thread(dlm, res);
2860 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2861 dlm_lockres_release_ast(dlm, res);
2869 ret = wait_event_interruptible_timeout(dlm->migration_wq,
2870 dlm_migration_can_proceed(dlm, res, target),
2875 test_bit(target, dlm->domain_map) ? "no":"yes");
2879 test_bit(target, dlm->domain_map) ? "no":"yes");
2881 if (!dlm_migration_can_proceed(dlm, res, target)) {
2888 spin_lock(&dlm->spinlock);
2889 if (!test_bit(target, dlm->domain_map)) {
2894 spin_unlock(&dlm->spinlock);
2924 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2933 BUG_ON(res->owner == dlm->node_num);
2937 if (lock->ml.node != dlm->node_num) {
2945 dlm_lockres_clear_refmap_bit(dlm, res,
2963 if (bit != dlm->node_num) {
2965 "migrating lockres, clearing\n", dlm->name,
2967 dlm_lockres_clear_refmap_bit(dlm, res, bit);
2978 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2987 assert_spin_locked(&dlm->spinlock);
2994 if (lock->ml.node == dlm->node_num)
2996 if (test_bit(lock->ml.node, dlm->exit_domain_map))
3010 if (noderef == dlm->node_num)
3012 if (test_bit(noderef, dlm->exit_domain_map))
3024 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
3048 spin_lock(&dlm->spinlock);
3049 skip = (!test_bit(nodenum, dlm->domain_map));
3050 spin_unlock(&dlm->spinlock);
3056 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3061 "MIGRATE_REQUEST to node %u\n", dlm->name,
3078 dlm->name, res->lockname.len, res->lockname.name,
3081 dlm_lockres_set_refmap_bit(dlm, res, nodenum);
3104 struct dlm_ctxt *dlm = data;
3112 if (!dlm_grab(dlm))
3128 spin_lock(&dlm->spinlock);
3129 res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3147 spin_lock(&dlm->master_lock);
3149 ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3157 spin_unlock(&dlm->master_lock);
3159 spin_unlock(&dlm->spinlock);
3163 dlm_mle_detach_hb_events(dlm, oldmle);
3170 dlm_put(dlm);
3174 /* must be holding dlm->spinlock and dlm->master_lock
3181 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3193 assert_spin_locked(&dlm->spinlock);
3194 assert_spin_locked(&dlm->master_lock);
3197 found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3202 if (master == dlm->node_num) {
3226 __dlm_unlink_mle(dlm, tmp);
3227 __dlm_mle_detach_hb_events(dlm, tmp);
3233 "migration\n", dlm->name,
3242 dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3249 __dlm_insert_mle(dlm, mle);
3257 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3263 res = __dlm_lookup_lockres(dlm, mle->mname, mle->mnamelen,
3266 spin_unlock(&dlm->master_lock);
3270 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3271 dlm_move_lockres_to_recovery_list(dlm, res);
3276 __dlm_mle_detach_hb_events(dlm, mle);
3279 spin_lock(&dlm->master_lock);
3281 spin_unlock(&dlm->master_lock);
3287 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3290 __dlm_mle_detach_hb_events(dlm, mle);
3293 __dlm_unlink_mle(dlm, mle);
3300 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3324 __dlm_mle_detach_hb_events(dlm, mle);
3329 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3337 mlog(0, "dlm=%s, dead node=%u\n", dlm->name, dead_node);
3339 assert_spin_locked(&dlm->spinlock);
3342 spin_lock(&dlm->master_lock);
3344 bucket = dlm_master_hash(dlm, i);
3360 dlm_clean_block_mle(dlm, mle, dead_node);
3383 dlm->name, dead_node,
3390 dlm_clean_migration_mle(dlm, mle);
3393 "%u to %u!\n", dlm->name, dead_node, mle->master,
3401 res = dlm_reset_mleres_owner(dlm, mle);
3410 spin_unlock(&dlm->master_lock);
3413 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3419 spin_lock(&dlm->spinlock);
3420 dlm_node_iter_init(dlm->domain_map, &iter);
3422 clear_bit(dlm->node_num, iter.node_map);
3423 spin_unlock(&dlm->spinlock);
3429 dlm_lockres_set_refmap_bit(dlm, res, old_master);
3433 ret = dlm_do_migrate_request(dlm, res, old_master,
3434 dlm->node_num, &iter);
3444 ret = dlm_do_assert_master(dlm, res, iter.node_map,
3456 ret = dlm_do_assert_master(dlm, res, iter.node_map,
3468 dlm_set_lockres_owner(dlm, res, dlm->node_num);
3472 dlm_kick_thread(dlm, res);
3511 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3527 wake_up(&dlm->migration_wq);
3530 void dlm_force_free_mles(struct dlm_ctxt *dlm)
3539 * marked the dlm state to DLM_CTXT_LEAVING. If any mles are still
3543 spin_lock(&dlm->spinlock);
3544 spin_lock(&dlm->master_lock);
3546 BUG_ON(dlm->dlm_state != DLM_CTXT_LEAVING);
3547 BUG_ON((find_next_bit(dlm->domain_map, O2NM_MAX_NODES, 0) < O2NM_MAX_NODES));
3550 bucket = dlm_master_hash(dlm, i);
3559 __dlm_unlink_mle(dlm, mle);
3560 __dlm_mle_detach_hb_events(dlm, mle);
3564 spin_unlock(&dlm->master_lock);
3565 spin_unlock(&dlm->spinlock);