Lines Matching refs:lkb

16    request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
46 given rsb and lkb and queues callbacks.
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
161 void dlm_print_lkb(struct dlm_lkb *lkb)
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
182 struct dlm_lkb *lkb;
189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 dlm_print_lkb(lkb);
192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 dlm_print_lkb(lkb);
195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 dlm_print_lkb(lkb);
198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 dlm_print_lkb(lkb);
219 static inline int can_be_queued(struct dlm_lkb *lkb)
221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
229 static inline int is_demoted(struct dlm_lkb *lkb)
231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
234 static inline int is_altmode(struct dlm_lkb *lkb)
236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
239 static inline int is_granted(struct dlm_lkb *lkb)
241 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
250 static inline int is_process_copy(struct dlm_lkb *lkb)
252 return lkb->lkb_nodeid &&
253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
256 static inline int is_master_copy(struct dlm_lkb *lkb)
258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
261 static inline int middle_conversion(struct dlm_lkb *lkb)
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
269 static inline int down_conversion(struct dlm_lkb *lkb)
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
284 static inline int is_overlap(struct dlm_lkb *lkb)
286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
292 if (is_master_copy(lkb))
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
306 queue_cast(r, lkb,
307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
312 if (is_master_copy(lkb)) {
313 send_bast(r, lkb, rqmode);
315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
320 * Basic operations on rsb's and lkb's
1173 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1174 The rsb must exist as long as any lkb's for it do. */
1176 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1179 lkb->lkb_resource = r;
1182 static void detach_lkb(struct dlm_lkb *lkb)
1184 if (lkb->lkb_resource) {
1185 put_rsb(lkb->lkb_resource);
1186 lkb->lkb_resource = NULL;
1193 struct dlm_lkb *lkb;
1196 lkb = dlm_allocate_lkb(ls);
1197 if (!lkb)
1200 lkb->lkb_last_bast_mode = -1;
1201 lkb->lkb_nodeid = -1;
1202 lkb->lkb_grmode = DLM_LOCK_IV;
1203 kref_init(&lkb->lkb_ref);
1204 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1205 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1206 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1207 INIT_LIST_HEAD(&lkb->lkb_callbacks);
1208 spin_lock_init(&lkb->lkb_cb_lock);
1209 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1213 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1215 lkb->lkb_id = rv;
1221 dlm_free_lkb(lkb);
1225 *lkb_ret = lkb;
1236 struct dlm_lkb *lkb;
1239 lkb = idr_find(&ls->ls_lkbidr, lkid);
1240 if (lkb)
1241 kref_get(&lkb->lkb_ref);
1244 *lkb_ret = lkb;
1245 return lkb ? 0 : -ENOENT;
1250 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1255 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1258 /* __put_lkb() is used when an lkb may not have an rsb attached to
1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1263 uint32_t lkid = lkb->lkb_id;
1266 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1272 detach_lkb(lkb);
1275 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1276 dlm_free_lvb(lkb->lkb_lvbptr);
1277 dlm_free_lkb(lkb);
1283 int dlm_put_lkb(struct dlm_lkb *lkb)
1287 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1288 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1290 ls = lkb->lkb_resource->res_ls;
1291 return __put_lkb(ls, lkb);
1295 a valid reference to the lkb, so there's no need for locking. */
1297 static inline void hold_lkb(struct dlm_lkb *lkb)
1299 kref_get(&lkb->lkb_ref);
1304 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1306 DLM_ASSERT(false, dlm_print_lkb(lkb););
1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1316 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1322 struct dlm_lkb *lkb = NULL, *iter;
1326 lkb = iter;
1331 if (!lkb)
1335 /* add/remove lkb to rsb's grant/convert/wait queue */
1337 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1339 kref_get(&lkb->lkb_ref);
1341 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1343 lkb->lkb_timestamp = ktime_get();
1345 lkb->lkb_status = status;
1349 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1350 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1352 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1356 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1357 lkb->lkb_grmode);
1360 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1363 list_add_tail(&lkb->lkb_statequeue,
1367 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1371 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1373 lkb->lkb_status = 0;
1374 list_del(&lkb->lkb_statequeue);
1375 unhold_lkb(lkb);
1378 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1380 hold_lkb(lkb);
1381 del_lkb(r, lkb);
1382 add_lkb(r, lkb, sts);
1383 unhold_lkb(lkb);
1403 /* add/remove lkb from global waiters list of lkb's waiting for
1406 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1408 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1414 if (is_overlap_unlock(lkb) ||
1415 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1420 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1423 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1426 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1432 wc = atomic_inc_return(&lkb->lkb_wait_count);
1433 hold_lkb(lkb);
1436 lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
1437 dlm_iflags_val(lkb));
1441 wc = atomic_fetch_inc(&lkb->lkb_wait_count);
1442 DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
1443 lkb->lkb_wait_type = mstype;
1444 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1445 hold_lkb(lkb);
1446 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1450 lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1451 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1456 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1461 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1464 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1468 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1469 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1475 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1476 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1485 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1487 lkb->lkb_id, lkb->lkb_wait_type);
1500 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1501 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1503 lkb->lkb_id);
1504 lkb->lkb_wait_type = 0;
1505 atomic_dec(&lkb->lkb_wait_count);
1506 unhold_lkb(lkb);
1513 if (lkb->lkb_wait_type) {
1514 lkb->lkb_wait_type = 0;
1519 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1520 lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1529 if (overlap_done && lkb->lkb_wait_type) {
1531 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1532 atomic_dec(&lkb->lkb_wait_count);
1533 unhold_lkb(lkb);
1534 lkb->lkb_wait_type = 0;
1537 DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
1539 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1540 if (atomic_dec_and_test(&lkb->lkb_wait_count))
1541 list_del_init(&lkb->lkb_wait_reply);
1542 unhold_lkb(lkb);
1546 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1548 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1552 error = _remove_from_waiters(lkb, mstype, NULL);
1560 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1563 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1568 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1720 /* lkb is master or local copy */
1722 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1730 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1733 if (!lkb->lkb_lvbptr)
1736 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1742 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1743 lkb->lkb_lvbseq = r->res_lvbseq;
1746 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1751 if (!lkb->lkb_lvbptr)
1754 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1763 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1765 lkb->lkb_lvbseq = r->res_lvbseq;
1770 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1773 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1775 if (lkb->lkb_grmode < DLM_LOCK_PW)
1778 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1783 if (!lkb->lkb_lvbptr)
1786 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1795 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1800 /* lkb is process copy (pc) */
1802 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1807 if (!lkb->lkb_lvbptr)
1810 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1813 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1818 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1819 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1823 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1824 remove_lock -- used for unlock, removes lkb from granted
1825 revert_lock -- used for cancel, moves lkb from convert to granted
1826 grant_lock -- used for request and convert, adds lkb to granted or
1827 moves lkb from convert or waiting to granted
1829 Each of these is used for master or local copy lkb's. There is
1831 a process copy (pc) lkb. */
1833 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1835 del_lkb(r, lkb);
1836 lkb->lkb_grmode = DLM_LOCK_IV;
1838 so this leads to the lkb being freed */
1839 unhold_lkb(lkb);
1842 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1844 set_lvb_unlock(r, lkb);
1845 _remove_lock(r, lkb);
1848 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1850 _remove_lock(r, lkb);
1857 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1861 lkb->lkb_rqmode = DLM_LOCK_IV;
1863 switch (lkb->lkb_status) {
1867 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1871 del_lkb(r, lkb);
1872 lkb->lkb_grmode = DLM_LOCK_IV;
1874 so this leads to the lkb being freed */
1875 unhold_lkb(lkb);
1879 log_print("invalid status for revert %d", lkb->lkb_status);
1884 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1886 return revert_lock(r, lkb);
1889 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1891 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1892 lkb->lkb_grmode = lkb->lkb_rqmode;
1893 if (lkb->lkb_status)
1894 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1896 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1899 lkb->lkb_rqmode = DLM_LOCK_IV;
1900 lkb->lkb_highbast = 0;
1903 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1905 set_lvb_lock(r, lkb);
1906 _grant_lock(r, lkb);
1909 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1912 set_lvb_lock_pc(r, lkb, ms);
1913 _grant_lock(r, lkb);
1918 lkb belongs to a remote node. */
1920 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1922 grant_lock(r, lkb);
1923 if (is_master_copy(lkb))
1924 send_grant(r, lkb);
1926 queue_cast(r, lkb, 0);
1937 static void munge_demoted(struct dlm_lkb *lkb)
1939 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1941 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1945 lkb->lkb_grmode = DLM_LOCK_NL;
1948 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
1953 lkb->lkb_id, le32_to_cpu(ms->m_type));
1957 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1958 lkb->lkb_rqmode = DLM_LOCK_PR;
1959 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1960 lkb->lkb_rqmode = DLM_LOCK_CW;
1962 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1963 dlm_print_lkb(lkb);
1967 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1971 if (lkb->lkb_id == first->lkb_id)
1977 /* Check if the given lkb conflicts with another lkb on the queue. */
1979 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1984 if (this == lkb)
1986 if (!modes_compat(this, lkb))
1997 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1998 * convert queue from being granted, then deadlk/demote lkb.
2007 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2008 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2060 * lkb is the lock to be granted
2072 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2075 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2092 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2096 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2100 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2104 if (queue_conflict(&r->res_grantqueue, lkb))
2113 if (queue_conflict(&r->res_convertqueue, lkb))
2118 * locks for a recovered rsb, on which lkb's have been rebuilt.
2119 * The lkb's may have been rebuilt on the queues in a different
2156 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2164 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2176 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2185 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2212 first_in_list(lkb, &r->res_waitqueue))
2218 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2222 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2223 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2228 rv = _can_be_granted(r, lkb, now, recover);
2238 if (is_convert && can_be_queued(lkb) &&
2239 conversion_deadlock_detect(r, lkb)) {
2240 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2241 lkb->lkb_grmode = DLM_LOCK_NL;
2242 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2247 lkb->lkb_id, now);
2260 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2262 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2266 lkb->lkb_rqmode = alt;
2267 rv = _can_be_granted(r, lkb, now, 0);
2269 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2271 lkb->lkb_rqmode = rqmode;
2283 struct dlm_lkb *lkb, *s;
2294 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2295 demoted = is_demoted(lkb);
2298 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2299 grant_lock_pending(r, lkb);
2306 if (!demoted && is_demoted(lkb)) {
2308 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2319 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2320 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2321 queue_bast(r, lkb, lkb->lkb_rqmode);
2322 lkb->lkb_highbast = lkb->lkb_rqmode;
2326 lkb->lkb_id, lkb->lkb_nodeid,
2333 hi = max_t(int, lkb->lkb_rqmode, hi);
2335 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2352 struct dlm_lkb *lkb, *s;
2354 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2355 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2356 grant_lock_pending(r, lkb);
2360 high = max_t(int, lkb->lkb_rqmode, high);
2361 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2390 struct dlm_lkb *lkb, *s;
2412 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2413 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2415 lkb->lkb_grmode == DLM_LOCK_PR)
2416 queue_bast(r, lkb, DLM_LOCK_CW);
2418 queue_bast(r, lkb, high);
2419 lkb->lkb_highbast = high;
2439 struct dlm_lkb *lkb)
2445 if (gr == lkb)
2447 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2448 queue_bast(r, gr, lkb->lkb_rqmode);
2449 gr->lkb_highbast = lkb->lkb_rqmode;
2454 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2456 send_bast_queue(r, &r->res_grantqueue, lkb);
2459 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2461 send_bast_queue(r, &r->res_grantqueue, lkb);
2462 send_bast_queue(r, &r->res_convertqueue, lkb);
2465 /* set_master(r, lkb) -- set the master nodeid of a resource
2468 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2469 known, it can just be copied to the lkb and the function will return
2471 before it can be copied to the lkb.
2473 When the rsb nodeid is being looked up remotely, the initial lkb
2475 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2479 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2480 1: the rsb master is not available and the lkb has been placed on
2484 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2490 r->res_first_lkid = lkb->lkb_id;
2491 lkb->lkb_nodeid = r->res_nodeid;
2495 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2496 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2501 lkb->lkb_nodeid = 0;
2506 lkb->lkb_nodeid = r->res_master_nodeid;
2518 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2522 lkb->lkb_nodeid = 0;
2526 r->res_first_lkid = lkb->lkb_id;
2527 send_lookup(r, lkb);
2533 struct dlm_lkb *lkb, *safe;
2535 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2536 list_del_init(&lkb->lkb_rsb_lookup);
2537 _request_lock(r, lkb);
2546 struct dlm_lkb *lkb;
2563 lkb the first_lkid */
2568 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2570 list_del_init(&lkb->lkb_rsb_lookup);
2571 r->res_first_lkid = lkb->lkb_id;
2572 _request_lock(r, lkb);
2630 /* these args will be copied to the lkb in validate_lock_args,
2632 an active lkb cannot be modified before locking the rsb */
2659 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2665 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2669 if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
2672 if (is_overlap(lkb))
2676 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2680 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2684 lkb->lkb_exflags = args->flags;
2685 dlm_set_sbflags_val(lkb, 0);
2686 lkb->lkb_astfn = args->astfn;
2687 lkb->lkb_astparam = args->astparam;
2688 lkb->lkb_bastfn = args->bastfn;
2689 lkb->lkb_rqmode = args->mode;
2690 lkb->lkb_lksb = args->lksb;
2691 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2692 lkb->lkb_ownpid = (int) current->pid;
2702 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2703 lkb->lkb_status, lkb->lkb_wait_type,
2704 lkb->lkb_resource->res_name);
2708 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2709 lkb->lkb_status, lkb->lkb_wait_type,
2710 lkb->lkb_resource->res_name);
2724 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2726 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2731 (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
2734 /* an lkb may be waiting for an rsb lookup to complete where the
2737 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2739 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2740 list_del_init(&lkb->lkb_rsb_lookup);
2741 queue_cast(lkb->lkb_resource, lkb,
2744 unhold_lkb(lkb); /* undoes create_lkb() */
2751 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2752 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2753 dlm_print_lkb(lkb);
2757 /* an lkb may still exist even though the lock is EOL'ed due to a
2762 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2763 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2771 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2774 if (is_overlap(lkb))
2777 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2778 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2784 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2785 !lkb->lkb_wait_type) {
2790 switch (lkb->lkb_wait_type) {
2793 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2809 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2812 if (is_overlap_unlock(lkb))
2815 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2816 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2821 switch (lkb->lkb_wait_type) {
2824 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2835 lkb->lkb_exflags |= args->flags;
2836 dlm_set_sbflags_val(lkb, 0);
2837 lkb->lkb_astparam = args->astparam;
2847 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2848 args->flags, lkb->lkb_wait_type,
2849 lkb->lkb_resource->res_name);
2853 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2854 args->flags, lkb->lkb_wait_type,
2855 lkb->lkb_resource->res_name);
2869 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2873 if (can_be_granted(r, lkb, 1, 0, NULL)) {
2874 grant_lock(r, lkb);
2875 queue_cast(r, lkb, 0);
2879 if (can_be_queued(lkb)) {
2881 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2886 queue_cast(r, lkb, -EAGAIN);
2891 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2896 if (force_blocking_asts(lkb))
2897 send_blocking_asts_all(r, lkb);
2900 send_blocking_asts(r, lkb);
2905 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2912 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2913 grant_lock(r, lkb);
2914 queue_cast(r, lkb, 0);
2922 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2924 revert_lock(r, lkb);
2925 queue_cast(r, lkb, -EDEADLK);
2936 if (is_demoted(lkb)) {
2938 if (_can_be_granted(r, lkb, 1, 0)) {
2939 grant_lock(r, lkb);
2940 queue_cast(r, lkb, 0);
2946 if (can_be_queued(lkb)) {
2948 del_lkb(r, lkb);
2949 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2954 queue_cast(r, lkb, -EAGAIN);
2959 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2968 if (force_blocking_asts(lkb))
2969 send_blocking_asts_all(r, lkb);
2972 send_blocking_asts(r, lkb);
2977 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2979 remove_lock(r, lkb);
2980 queue_cast(r, lkb, -DLM_EUNLOCK);
2984 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2992 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2996 error = revert_lock(r, lkb);
2998 queue_cast(r, lkb, -DLM_ECANCEL);
3004 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3016 /* add a new lkb to a possibly new rsb, called by requesting process */
3018 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3022 /* set_master: sets lkb nodeid from r */
3024 error = set_master(r, lkb);
3034 error = send_request(r, lkb);
3036 error = do_request(r, lkb);
3039 do_request_effects(r, lkb, error);
3045 /* change some property of an existing lkb, e.g. mode */
3047 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3053 error = send_convert(r, lkb);
3055 error = do_convert(r, lkb);
3058 do_convert_effects(r, lkb, error);
3064 /* remove an existing lkb from the granted queue */
3066 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3072 error = send_unlock(r, lkb);
3074 error = do_unlock(r, lkb);
3077 do_unlock_effects(r, lkb, error);
3083 /* remove an existing lkb from the convert or wait queue */
3085 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3091 error = send_cancel(r, lkb);
3093 error = do_cancel(r, lkb);
3096 do_cancel_effects(r, lkb, error);
3107 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3114 error = validate_lock_args(ls, lkb, args);
3124 attach_lkb(r, lkb);
3125 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3127 error = _request_lock(r, lkb);
3134 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3140 r = lkb->lkb_resource;
3145 error = validate_lock_args(ls, lkb, args);
3149 error = _convert_lock(r, lkb);
3156 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3162 r = lkb->lkb_resource;
3167 error = validate_unlock_args(lkb, args);
3171 error = _unlock_lock(r, lkb);
3178 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3184 r = lkb->lkb_resource;
3189 error = validate_unlock_args(lkb, args);
3193 error = _cancel_lock(r, lkb);
3216 struct dlm_lkb *lkb;
3227 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3229 error = create_lkb(ls, &lkb);
3234 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3242 error = convert_lock(ls, lkb, &args);
3244 error = request_lock(ls, lkb, name, namelen, &args);
3249 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3252 __put_lkb(ls, lkb);
3268 struct dlm_lkb *lkb;
3278 error = find_lkb(ls, lkid, &lkb);
3282 trace_dlm_unlock_start(ls, lkb, flags);
3289 error = cancel_lock(ls, lkb, &args);
3291 error = unlock_lock(ls, lkb, &args);
3298 trace_dlm_unlock_end(ls, lkb, flags, error);
3300 dlm_put_lkb(lkb);
3362 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3381 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3400 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3403 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3404 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3405 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3406 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3407 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3408 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
3409 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
3410 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3411 ms->m_status = cpu_to_le32(lkb->lkb_status);
3412 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3413 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3417 not from lkb fields */
3419 if (lkb->lkb_bastfn)
3421 if (lkb->lkb_astfn)
3437 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3439 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3444 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3452 error = add_to_waiters(lkb, mstype, to_nodeid);
3456 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3460 send_args(r, lkb, ms);
3468 remove_from_waiters(lkb, msg_reply_type(mstype));
3472 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3474 return send_common(r, lkb, DLM_MSG_REQUEST);
3477 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3481 error = send_common(r, lkb, DLM_MSG_CONVERT);
3484 if (!error && down_conversion(lkb)) {
3485 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3488 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3494 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3498 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3500 return send_common(r, lkb, DLM_MSG_UNLOCK);
3503 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3505 return send_common(r, lkb, DLM_MSG_CANCEL);
3508 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3514 to_nodeid = lkb->lkb_nodeid;
3516 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
3521 send_args(r, lkb, ms);
3530 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3536 to_nodeid = lkb->lkb_nodeid;
3543 send_args(r, lkb, ms);
3552 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3560 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3569 send_args(r, lkb, ms);
3577 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3602 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3609 to_nodeid = lkb->lkb_nodeid;
3611 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3615 send_args(r, lkb, ms);
3624 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3626 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3629 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3631 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3634 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3636 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3639 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3641 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3669 the lkb for any type of message */
3671 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3673 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3674 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3675 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3678 static void receive_flags_reply(struct dlm_lkb *lkb,
3685 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3686 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3695 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3700 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3701 if (!lkb->lkb_lvbptr)
3702 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3703 if (!lkb->lkb_lvbptr)
3708 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3723 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3726 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3727 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3728 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3729 lkb->lkb_grmode = DLM_LOCK_IV;
3730 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3732 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3733 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3735 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3736 /* lkb was just created so there won't be an lvb yet */
3737 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3738 if (!lkb->lkb_lvbptr)
3745 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3748 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3751 if (receive_lvb(ls, lkb, ms))
3754 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3755 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3760 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3763 if (receive_lvb(ls, lkb, ms))
3768 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3773 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3774 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3775 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3779 fields in the lkb. */
3781 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3788 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3789 log_error(lkb->lkb_resource->res_ls,
3799 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3808 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3813 if (!is_process_copy(lkb))
3815 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3825 log_error(lkb->lkb_resource->res_ls,
3827 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3828 lkb->lkb_remid, dlm_iflags_val(lkb),
3829 lkb->lkb_nodeid);
3835 struct dlm_lkb *lkb;
3842 error = create_lkb(ls, &lkb);
3846 receive_flags(lkb, ms);
3847 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3848 error = receive_request_args(ls, lkb, ms);
3850 __put_lkb(ls, lkb);
3865 __put_lkb(ls, lkb);
3876 __put_lkb(ls, lkb);
3881 attach_lkb(r, lkb);
3882 error = do_request(r, lkb);
3883 send_request_reply(r, lkb, error);
3884 do_request_effects(r, lkb, error);
3892 dlm_put_lkb(lkb);
3896 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3914 struct dlm_lkb *lkb;
3918 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3922 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3924 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3925 (unsigned long long)lkb->lkb_recover_seq,
3929 dlm_put_lkb(lkb);
3933 r = lkb->lkb_resource;
3938 error = validate_message(lkb, ms);
3942 receive_flags(lkb, ms);
3944 error = receive_convert_args(ls, lkb, ms);
3946 send_convert_reply(r, lkb, error);
3950 reply = !down_conversion(lkb);
3952 error = do_convert(r, lkb);
3954 send_convert_reply(r, lkb, error);
3955 do_convert_effects(r, lkb, error);
3959 dlm_put_lkb(lkb);
3970 struct dlm_lkb *lkb;
3974 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3978 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3980 lkb->lkb_id, lkb->lkb_remid,
3984 dlm_put_lkb(lkb);
3988 r = lkb->lkb_resource;
3993 error = validate_message(lkb, ms);
3997 receive_flags(lkb, ms);
3999 error = receive_unlock_args(ls, lkb, ms);
4001 send_unlock_reply(r, lkb, error);
4005 error = do_unlock(r, lkb);
4006 send_unlock_reply(r, lkb, error);
4007 do_unlock_effects(r, lkb, error);
4011 dlm_put_lkb(lkb);
4022 struct dlm_lkb *lkb;
4026 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4030 receive_flags(lkb, ms);
4032 r = lkb->lkb_resource;
4037 error = validate_message(lkb, ms);
4041 error = do_cancel(r, lkb);
4042 send_cancel_reply(r, lkb, error);
4043 do_cancel_effects(r, lkb, error);
4047 dlm_put_lkb(lkb);
4058 struct dlm_lkb *lkb;
4062 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4066 r = lkb->lkb_resource;
4071 error = validate_message(lkb, ms);
4075 receive_flags_reply(lkb, ms, false);
4076 if (is_altmode(lkb))
4077 munge_altmode(lkb, ms);
4078 grant_lock_pc(r, lkb, ms);
4079 queue_cast(r, lkb, 0);
4083 dlm_put_lkb(lkb);
4089 struct dlm_lkb *lkb;
4093 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4097 r = lkb->lkb_resource;
4102 error = validate_message(lkb, ms);
4106 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4107 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4111 dlm_put_lkb(lkb);
4231 struct dlm_lkb *lkb;
4236 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4240 r = lkb->lkb_resource;
4244 error = validate_message(lkb, ms);
4248 mstype = lkb->lkb_wait_type;
4249 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4252 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4263 lkb->lkb_nodeid = from_nodeid;
4272 queue_cast(r, lkb, -EAGAIN);
4274 unhold_lkb(lkb); /* undoes create_lkb() */
4280 receive_flags_reply(lkb, ms, false);
4281 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4282 if (is_altmode(lkb))
4283 munge_altmode(lkb, ms);
4285 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4287 grant_lock_pc(r, lkb, ms);
4288 queue_cast(r, lkb, 0);
4297 "master %d dir %d first %x %s", lkb->lkb_id,
4306 lkb->lkb_nodeid = -1;
4309 if (is_overlap(lkb)) {
4311 queue_cast_overlap(r, lkb);
4313 unhold_lkb(lkb); /* undoes create_lkb() */
4315 _request_lock(r, lkb);
4324 lkb->lkb_id, result);
4328 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4330 lkb->lkb_id, result);
4331 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4332 send_unlock(r, lkb);
4335 &lkb->lkb_iflags)) {
4336 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4337 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4338 send_cancel(r, lkb);
4340 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4341 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4346 dlm_put_lkb(lkb);
4350 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4357 queue_cast(r, lkb, -EAGAIN);
4361 receive_flags_reply(lkb, ms, local);
4362 revert_lock_pc(r, lkb);
4363 queue_cast(r, lkb, -EDEADLK);
4368 receive_flags_reply(lkb, ms, local);
4369 if (is_demoted(lkb))
4370 munge_demoted(lkb);
4371 del_lkb(r, lkb);
4372 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4377 receive_flags_reply(lkb, ms, local);
4378 if (is_demoted(lkb))
4379 munge_demoted(lkb);
4380 grant_lock_pc(r, lkb, ms);
4381 queue_cast(r, lkb, 0);
4386 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4390 dlm_print_lkb(lkb);
4394 static void _receive_convert_reply(struct dlm_lkb *lkb,
4397 struct dlm_rsb *r = lkb->lkb_resource;
4403 error = validate_message(lkb, ms);
4408 error = remove_from_waiters_ms(lkb, ms, local);
4412 __receive_convert_reply(r, lkb, ms, local);
4421 struct dlm_lkb *lkb;
4424 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4428 _receive_convert_reply(lkb, ms, false);
4429 dlm_put_lkb(lkb);
4433 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4436 struct dlm_rsb *r = lkb->lkb_resource;
4442 error = validate_message(lkb, ms);
4447 error = remove_from_waiters_ms(lkb, ms, local);
4455 receive_flags_reply(lkb, ms, local);
4456 remove_lock_pc(r, lkb);
4457 queue_cast(r, lkb, -DLM_EUNLOCK);
4463 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4473 struct dlm_lkb *lkb;
4476 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4480 _receive_unlock_reply(lkb, ms, false);
4481 dlm_put_lkb(lkb);
4485 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4488 struct dlm_rsb *r = lkb->lkb_resource;
4494 error = validate_message(lkb, ms);
4499 error = remove_from_waiters_ms(lkb, ms, local);
4507 receive_flags_reply(lkb, ms, local);
4508 revert_lock_pc(r, lkb);
4509 queue_cast(r, lkb, -DLM_ECANCEL);
4515 lkb->lkb_id,
4526 struct dlm_lkb *lkb;
4529 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4533 _receive_cancel_reply(lkb, ms, false);
4534 dlm_put_lkb(lkb);
4541 struct dlm_lkb *lkb;
4546 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4556 r = lkb->lkb_resource;
4560 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4576 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4589 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4592 lkb->lkb_nodeid = -1;
4599 if (is_overlap(lkb)) {
4601 lkb->lkb_id, dlm_iflags_val(lkb));
4602 queue_cast_overlap(r, lkb);
4603 unhold_lkb(lkb); /* undoes create_lkb() */
4607 _request_lock(r, lkb);
4615 dlm_put_lkb(lkb);
4843 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4846 if (middle_conversion(lkb)) {
4847 hold_lkb(lkb);
4851 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4852 _receive_convert_reply(lkb, ms_local, true);
4855 lkb->lkb_grmode = DLM_LOCK_IV;
4856 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4857 unhold_lkb(lkb);
4859 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4860 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4863 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4867 /* A waiting lkb needs recovery if the master node has failed, or
4870 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4876 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4890 struct dlm_lkb *lkb, *safe;
4901 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4903 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4908 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4911 lkb->lkb_id,
4912 lkb->lkb_remid,
4913 lkb->lkb_wait_type,
4914 lkb->lkb_resource->res_nodeid,
4915 lkb->lkb_nodeid,
4916 lkb->lkb_wait_nodeid,
4923 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4924 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4928 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4931 wait_type = lkb->lkb_wait_type;
4941 if (is_overlap_cancel(lkb)) {
4943 if (lkb->lkb_grmode == DLM_LOCK_IV)
4946 if (is_overlap_unlock(lkb)) {
4948 if (lkb->lkb_grmode == DLM_LOCK_IV)
4953 lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
4960 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4964 recover_convert_waiter(ls, lkb, ms_local);
4968 hold_lkb(lkb);
4972 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4973 _receive_unlock_reply(lkb, ms_local, true);
4974 dlm_put_lkb(lkb);
4978 hold_lkb(lkb);
4982 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4983 _receive_cancel_reply(lkb, ms_local, true);
4984 dlm_put_lkb(lkb);
4988 log_error(ls, "invalid lkb wait_type %d %d",
4989 lkb->lkb_wait_type, wait_type);
4999 struct dlm_lkb *lkb = NULL, *iter;
5005 lkb = iter;
5011 return lkb;
5014 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
5015 master or dir-node for r. Processing the lkb may result in it being placed
5025 recovery. if before, the lkb may still have a pos wait_count; if after, the
5032 struct dlm_lkb *lkb;
5043 lkb = find_resend_waiter(ls);
5044 if (!lkb)
5047 r = lkb->lkb_resource;
5051 mstype = lkb->lkb_wait_type;
5053 &lkb->lkb_iflags);
5055 &lkb->lkb_iflags);
5060 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5061 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5068 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5069 lkb->lkb_wait_type = 0;
5073 while (!atomic_dec_and_test(&lkb->lkb_wait_count))
5074 unhold_lkb(lkb);
5077 list_del_init(&lkb->lkb_wait_reply);
5085 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5087 unhold_lkb(lkb); /* undoes create_lkb() */
5091 queue_cast(r, lkb, -DLM_ECANCEL);
5093 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5094 _unlock_lock(r, lkb);
5104 _request_lock(r, lkb);
5109 _convert_lock(r, lkb);
5119 lkb->lkb_id, mstype, r->res_nodeid,
5124 dlm_put_lkb(lkb);
5133 struct dlm_lkb *lkb, *safe;
5135 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5136 if (!is_master_copy(lkb))
5142 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5145 del_lkb(r, lkb);
5147 /* this put should free the lkb */
5148 if (!dlm_put_lkb(lkb))
5149 log_error(ls, "purged mstcpy lkb not released");
5166 struct dlm_lkb *lkb, *safe;
5168 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5169 if (!is_master_copy(lkb))
5172 if ((lkb->lkb_nodeid == nodeid_gone) ||
5173 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5177 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5178 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5182 del_lkb(r, lkb);
5184 /* this put should free the lkb */
5185 if (!dlm_put_lkb(lkb))
5186 log_error(ls, "purged dead lkb not released");
5267 * we are interested in are those with lkb's on either the convert or
5316 struct dlm_lkb *lkb;
5318 list_for_each_entry(lkb, head, lkb_statequeue) {
5319 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5320 return lkb;
5328 struct dlm_lkb *lkb;
5330 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5331 if (lkb)
5332 return lkb;
5333 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5334 if (lkb)
5335 return lkb;
5336 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5337 if (lkb)
5338 return lkb;
5343 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5348 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5349 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5350 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5351 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5352 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5353 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5354 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5355 lkb->lkb_rqmode = rl->rl_rqmode;
5356 lkb->lkb_grmode = rl->rl_grmode;
5359 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5360 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5362 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5367 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5368 if (!lkb->lkb_lvbptr)
5370 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5378 middle_conversion(lkb)) {
5380 lkb->lkb_grmode = DLM_LOCK_IV;
5387 /* This lkb may have been recovered in a previous aborted recovery so we need
5388 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5389 If so we just send back a standard reply. If not, we create a new lkb with
5399 struct dlm_lkb *lkb;
5436 lkb = search_remid(r, from_nodeid, remid);
5437 if (lkb) {
5442 error = create_lkb(ls, &lkb);
5446 error = receive_rcom_lock_args(ls, lkb, r, rc);
5448 __put_lkb(ls, lkb);
5452 attach_lkb(r, lkb);
5453 add_lkb(r, lkb, rl->rl_status);
5461 saving in its process-copy lkb */
5462 *rl_remid = cpu_to_le32(lkb->lkb_id);
5464 lkb->lkb_recover_seq = ls->ls_recover_seq;
5483 struct dlm_lkb *lkb;
5491 error = find_lkb(ls, lkid, &lkb);
5499 r = lkb->lkb_resource;
5503 if (!is_process_copy(lkb)) {
5510 dlm_put_lkb(lkb);
5524 dlm_send_rcom_lock(r, lkb, seq);
5528 lkb->lkb_remid = remid;
5542 dlm_put_lkb(lkb);
5550 struct dlm_lkb *lkb;
5557 error = create_lkb(ls, &lkb);
5563 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5582 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5585 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5586 error = request_lock(ls, lkb, name, namelen, &args);
5601 /* add this new lkb to the per-process list of locks */
5603 hold_lkb(lkb);
5604 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5608 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5610 __put_lkb(ls, lkb);
5619 struct dlm_lkb *lkb;
5626 error = find_lkb(ls, lkid, &lkb);
5630 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5635 ua = lkb->lkb_ua;
5659 error = convert_lock(ls, lkb, &args);
5664 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5665 dlm_put_lkb(lkb);
5682 struct dlm_lkb *lkb = NULL, *iter;
5698 lkb = iter;
5706 if (!lkb && found_other_mode) {
5711 if (!lkb) {
5716 lkb->lkb_exflags = flags;
5717 lkb->lkb_ownpid = (int) current->pid;
5719 ua = lkb->lkb_ua;
5730 * The lkb reference from the ls_orphans list was not
5736 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5746 struct dlm_lkb *lkb;
5753 error = find_lkb(ls, lkid, &lkb);
5757 trace_dlm_unlock_start(ls, lkb, flags);
5759 ua = lkb->lkb_ua;
5771 error = unlock_lock(ls, lkb, &args);
5782 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5783 if (!list_empty(&lkb->lkb_ownqueue))
5784 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5787 trace_dlm_unlock_end(ls, lkb, flags, error);
5788 dlm_put_lkb(lkb);
5798 struct dlm_lkb *lkb;
5805 error = find_lkb(ls, lkid, &lkb);
5809 trace_dlm_unlock_start(ls, lkb, flags);
5811 ua = lkb->lkb_ua;
5820 error = cancel_lock(ls, lkb, &args);
5828 trace_dlm_unlock_end(ls, lkb, flags, error);
5829 dlm_put_lkb(lkb);
5838 struct dlm_lkb *lkb;
5846 error = find_lkb(ls, lkid, &lkb);
5850 trace_dlm_unlock_start(ls, lkb, flags);
5852 ua = lkb->lkb_ua;
5860 r = lkb->lkb_resource;
5864 error = validate_unlock_args(lkb, &args);
5867 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
5869 error = _cancel_lock(r, lkb);
5880 trace_dlm_unlock_end(ls, lkb, flags, error);
5881 dlm_put_lkb(lkb);
5887 /* lkb's that are removed from the waiters list by revert are just left on the
5890 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5895 hold_lkb(lkb); /* reference for the ls_orphans list */
5897 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5900 set_unlock_args(0, lkb->lkb_ua, &args);
5902 error = cancel_lock(ls, lkb, &args);
5908 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
5913 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5919 lkb->lkb_ua, &args);
5921 error = unlock_lock(ls, lkb, &args);
5934 struct dlm_lkb *lkb = NULL;
5940 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5941 list_del_init(&lkb->lkb_ownqueue);
5943 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5944 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
5946 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5949 return lkb;
5953 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5957 list, and no more device_writes should add lkb's to proc->locks list; so we
5964 struct dlm_lkb *lkb, *safe;
5969 lkb = del_proc_lock(ls, proc);
5970 if (!lkb)
5972 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5973 orphan_proc_lock(ls, lkb);
5975 unlock_proc_lock(ls, lkb);
5978 added by dlm_user_request, it may result in the lkb
5981 dlm_put_lkb(lkb);
5987 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5988 list_del_init(&lkb->lkb_ownqueue);
5989 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5990 dlm_put_lkb(lkb);
5993 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5994 dlm_purge_lkb_callbacks(lkb);
5995 list_del_init(&lkb->lkb_cb_list);
5996 dlm_put_lkb(lkb);
6005 struct dlm_lkb *lkb, *safe;
6008 lkb = NULL;
6011 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6013 list_del_init(&lkb->lkb_ownqueue);
6017 if (!lkb)
6020 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6021 unlock_proc_lock(ls, lkb);
6022 dlm_put_lkb(lkb); /* ref from proc->locks list */
6026 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6027 list_del_init(&lkb->lkb_ownqueue);
6028 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6029 dlm_put_lkb(lkb);
6034 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6035 dlm_purge_lkb_callbacks(lkb);
6036 list_del_init(&lkb->lkb_cb_list);
6037 dlm_put_lkb(lkb);
6046 struct dlm_lkb *lkb, *safe;
6049 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6050 if (pid && lkb->lkb_ownpid != pid)
6052 unlock_proc_lock(ls, lkb);
6053 list_del_init(&lkb->lkb_ownqueue);
6054 dlm_put_lkb(lkb);
6098 struct dlm_lkb *lkb;
6110 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6116 dlm_set_dflags_val(lkb, lkb_dflags);
6117 lkb->lkb_nodeid = lkb_nodeid;
6118 lkb->lkb_lksb = lksb;
6121 lkb->lkb_astparam = (void *)0xDEADBEEF;
6126 __put_lkb(ls, lkb);
6131 attach_lkb(r, lkb);
6132 add_lkb(r, lkb, lkb_status);
6142 struct dlm_lkb *lkb;
6145 error = find_lkb(ls, lkb_id, &lkb);
6149 error = add_to_waiters(lkb, mstype, to_nodeid);
6150 dlm_put_lkb(lkb);