Lines Matching refs:lkb
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
46 given rsb and lkb and queues callbacks.
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
90 static void del_timeout(struct dlm_lkb *lkb);
160 void dlm_print_lkb(struct dlm_lkb *lkb)
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
167 (unsigned long long)lkb->lkb_recover_seq);
181 struct dlm_lkb *lkb;
188 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
189 dlm_print_lkb(lkb);
191 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
194 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
195 dlm_print_lkb(lkb);
197 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
198 dlm_print_lkb(lkb);
218 static inline int can_be_queued(struct dlm_lkb *lkb)
220 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 static inline int is_demoted(struct dlm_lkb *lkb)
230 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 static inline int is_altmode(struct dlm_lkb *lkb)
235 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 static inline int is_granted(struct dlm_lkb *lkb)
240 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
249 static inline int is_process_copy(struct dlm_lkb *lkb)
251 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 static inline int is_master_copy(struct dlm_lkb *lkb)
256 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 static inline int middle_conversion(struct dlm_lkb *lkb)
261 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
267 static inline int down_conversion(struct dlm_lkb *lkb)
269 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
274 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
279 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 static inline int is_overlap(struct dlm_lkb *lkb)
284 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
288 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
290 if (is_master_copy(lkb))
293 del_timeout(lkb);
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
299 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
304 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
309 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 queue_cast(r, lkb,
315 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 if (is_master_copy(lkb)) {
321 send_bast(r, lkb, rqmode);
323 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
328 * Basic operations on rsb's and lkb's
1164 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1165 The rsb must exist as long as any lkb's for it do. */
1167 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1170 lkb->lkb_resource = r;
1173 static void detach_lkb(struct dlm_lkb *lkb)
1175 if (lkb->lkb_resource) {
1176 put_rsb(lkb->lkb_resource);
1177 lkb->lkb_resource = NULL;
1183 struct dlm_lkb *lkb;
1186 lkb = dlm_allocate_lkb(ls);
1187 if (!lkb)
1190 lkb->lkb_nodeid = -1;
1191 lkb->lkb_grmode = DLM_LOCK_IV;
1192 kref_init(&lkb->lkb_ref);
1193 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1194 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1195 INIT_LIST_HEAD(&lkb->lkb_time_list);
1196 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1197 mutex_init(&lkb->lkb_cb_mutex);
1198 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1202 rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1204 lkb->lkb_id = rv;
1210 dlm_free_lkb(lkb);
1214 *lkb_ret = lkb;
1220 struct dlm_lkb *lkb;
1223 lkb = idr_find(&ls->ls_lkbidr, lkid);
1224 if (lkb)
1225 kref_get(&lkb->lkb_ref);
1228 *lkb_ret = lkb;
1229 return lkb ? 0 : -ENOENT;
1234 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1239 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1242 /* __put_lkb() is used when an lkb may not have an rsb attached to
1245 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1247 uint32_t lkid = lkb->lkb_id;
1250 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1254 detach_lkb(lkb);
1257 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1258 dlm_free_lvb(lkb->lkb_lvbptr);
1259 dlm_free_lkb(lkb);
1267 int dlm_put_lkb(struct dlm_lkb *lkb)
1271 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1272 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1274 ls = lkb->lkb_resource->res_ls;
1275 return __put_lkb(ls, lkb);
1279 a valid reference to the lkb, so there's no need for locking. */
1281 static inline void hold_lkb(struct dlm_lkb *lkb)
1283 kref_get(&lkb->lkb_ref);
1291 static inline void unhold_lkb(struct dlm_lkb *lkb)
1294 rv = kref_put(&lkb->lkb_ref, kill_lkb);
1295 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1301 struct dlm_lkb *lkb = NULL;
1303 list_for_each_entry(lkb, head, lkb_statequeue)
1304 if (lkb->lkb_rqmode < mode)
1307 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1310 /* add/remove lkb to rsb's grant/convert/wait queue */
1312 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1314 kref_get(&lkb->lkb_ref);
1316 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1318 lkb->lkb_timestamp = ktime_get();
1320 lkb->lkb_status = status;
1324 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1325 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1327 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1331 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1332 lkb->lkb_grmode);
1335 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1336 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1338 list_add_tail(&lkb->lkb_statequeue,
1342 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1346 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1348 lkb->lkb_status = 0;
1349 list_del(&lkb->lkb_statequeue);
1350 unhold_lkb(lkb);
1353 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1355 hold_lkb(lkb);
1356 del_lkb(r, lkb);
1357 add_lkb(r, lkb, sts);
1358 unhold_lkb(lkb);
1395 struct dlm_lkb *lkb;
1408 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1409 if (!lkb->lkb_wait_time)
1414 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1419 lkb->lkb_wait_time = 0;
1431 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1435 "node %d", lkb->lkb_id, (long long)us,
1436 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1447 /* add/remove lkb from global waiters list of lkb's waiting for
1450 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1452 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1457 if (is_overlap_unlock(lkb) ||
1458 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1463 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1466 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1469 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1475 lkb->lkb_wait_count++;
1476 hold_lkb(lkb);
1479 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1480 lkb->lkb_wait_count, lkb->lkb_flags);
1484 DLM_ASSERT(!lkb->lkb_wait_count,
1485 dlm_print_lkb(lkb);
1486 printk("wait_count %d\n", lkb->lkb_wait_count););
1488 lkb->lkb_wait_count++;
1489 lkb->lkb_wait_type = mstype;
1490 lkb->lkb_wait_time = ktime_get();
1491 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1492 hold_lkb(lkb);
1493 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1497 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1498 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1503 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1508 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1511 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1514 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1515 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1516 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1521 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1522 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1523 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1532 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1534 lkb->lkb_id, lkb->lkb_wait_type);
1547 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1548 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1550 lkb->lkb_id);
1551 lkb->lkb_wait_type = 0;
1552 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1553 lkb->lkb_wait_count--;
1554 unhold_lkb(lkb);
1561 if (lkb->lkb_wait_type) {
1562 lkb->lkb_wait_type = 0;
1567 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1568 mstype, lkb->lkb_flags);
1577 if (overlap_done && lkb->lkb_wait_type) {
1579 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1580 lkb->lkb_wait_count--;
1581 unhold_lkb(lkb);
1582 lkb->lkb_wait_type = 0;
1585 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1587 lkb->lkb_flags &= ~DLM_IFL_RESEND;
1588 lkb->lkb_wait_count--;
1589 if (!lkb->lkb_wait_count)
1590 list_del_init(&lkb->lkb_wait_reply);
1591 unhold_lkb(lkb);
1595 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1597 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1601 error = _remove_from_waiters(lkb, mstype, NULL);
1609 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1611 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1616 error = _remove_from_waiters(lkb, ms->m_type, ms);
1814 static void add_timeout(struct dlm_lkb *lkb)
1816 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1818 if (is_master_copy(lkb))
1822 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1823 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1826 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1831 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1833 hold_lkb(lkb);
1834 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1838 static void del_timeout(struct dlm_lkb *lkb)
1840 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1843 if (!list_empty(&lkb->lkb_time_list)) {
1844 list_del_init(&lkb->lkb_time_list);
1845 unhold_lkb(lkb);
1853 to specify some special timeout-related bits in the lkb that are just to
1859 struct dlm_lkb *lkb = NULL, *iter;
1886 lkb = iter;
1891 if (!lkb)
1894 r = lkb->lkb_resource;
1900 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1901 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1902 del_timeout(lkb);
1903 dlm_timeout_warn(lkb);
1908 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1909 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1910 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1911 del_timeout(lkb);
1912 _cancel_lock(r, lkb);
1917 dlm_put_lkb(lkb);
1926 struct dlm_lkb *lkb;
1931 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1932 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1939 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1940 if (ktime_to_us(lkb->lkb_wait_time))
1941 lkb->lkb_wait_time = ktime_get();
1946 /* lkb is master or local copy */
1948 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1956 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1959 if (!lkb->lkb_lvbptr)
1962 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1968 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1969 lkb->lkb_lvbseq = r->res_lvbseq;
1972 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1977 if (!lkb->lkb_lvbptr)
1980 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1989 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1991 lkb->lkb_lvbseq = r->res_lvbseq;
1996 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1999 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2001 if (lkb->lkb_grmode < DLM_LOCK_PW)
2004 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2009 if (!lkb->lkb_lvbptr)
2012 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2021 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2026 /* lkb is process copy (pc) */
2028 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2033 if (!lkb->lkb_lvbptr)
2036 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2039 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2044 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2045 lkb->lkb_lvbseq = ms->m_lvbseq;
2049 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2050 remove_lock -- used for unlock, removes lkb from granted
2051 revert_lock -- used for cancel, moves lkb from convert to granted
2052 grant_lock -- used for request and convert, adds lkb to granted or
2053 moves lkb from convert or waiting to granted
2055 Each of these is used for master or local copy lkb's. There is
2057 a process copy (pc) lkb. */
2059 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2061 del_lkb(r, lkb);
2062 lkb->lkb_grmode = DLM_LOCK_IV;
2064 so this leads to the lkb being freed */
2065 unhold_lkb(lkb);
2068 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2070 set_lvb_unlock(r, lkb);
2071 _remove_lock(r, lkb);
2074 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076 _remove_lock(r, lkb);
2083 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2087 lkb->lkb_rqmode = DLM_LOCK_IV;
2089 switch (lkb->lkb_status) {
2093 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2097 del_lkb(r, lkb);
2098 lkb->lkb_grmode = DLM_LOCK_IV;
2100 so this leads to the lkb being freed */
2101 unhold_lkb(lkb);
2105 log_print("invalid status for revert %d", lkb->lkb_status);
2110 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2112 return revert_lock(r, lkb);
2115 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2117 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2118 lkb->lkb_grmode = lkb->lkb_rqmode;
2119 if (lkb->lkb_status)
2120 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2122 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2125 lkb->lkb_rqmode = DLM_LOCK_IV;
2126 lkb->lkb_highbast = 0;
2129 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2131 set_lvb_lock(r, lkb);
2132 _grant_lock(r, lkb);
2135 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2138 set_lvb_lock_pc(r, lkb, ms);
2139 _grant_lock(r, lkb);
2144 lkb belongs to a remote node. */
2146 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2148 grant_lock(r, lkb);
2149 if (is_master_copy(lkb))
2150 send_grant(r, lkb);
2152 queue_cast(r, lkb, 0);
2163 static void munge_demoted(struct dlm_lkb *lkb)
2165 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2167 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2171 lkb->lkb_grmode = DLM_LOCK_NL;
2174 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2179 lkb->lkb_id, ms->m_type);
2183 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2184 lkb->lkb_rqmode = DLM_LOCK_PR;
2185 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2186 lkb->lkb_rqmode = DLM_LOCK_CW;
2188 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2189 dlm_print_lkb(lkb);
2193 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2197 if (lkb->lkb_id == first->lkb_id)
2203 /* Check if the given lkb conflicts with another lkb on the queue. */
2205 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2210 if (this == lkb)
2212 if (!modes_compat(this, lkb))
2223 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2224 * convert queue from being granted, then deadlk/demote lkb.
2233 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2234 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2286 * lkb is the lock to be granted
2298 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2301 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2318 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2322 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2326 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2330 if (queue_conflict(&r->res_grantqueue, lkb))
2339 if (queue_conflict(&r->res_convertqueue, lkb))
2344 * locks for a recovered rsb, on which lkb's have been rebuilt.
2345 * The lkb's may have been rebuilt on the queues in a different
2382 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2390 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2402 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2411 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2438 first_in_list(lkb, &r->res_waitqueue))
2444 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2448 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2449 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2454 rv = _can_be_granted(r, lkb, now, recover);
2464 if (is_convert && can_be_queued(lkb) &&
2465 conversion_deadlock_detect(r, lkb)) {
2466 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2467 lkb->lkb_grmode = DLM_LOCK_NL;
2468 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2473 lkb->lkb_id, now);
2486 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2488 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2492 lkb->lkb_rqmode = alt;
2493 rv = _can_be_granted(r, lkb, now, 0);
2495 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2497 lkb->lkb_rqmode = rqmode;
2509 struct dlm_lkb *lkb, *s;
2520 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2521 demoted = is_demoted(lkb);
2524 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2525 grant_lock_pending(r, lkb);
2532 if (!demoted && is_demoted(lkb)) {
2534 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2545 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2546 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2547 queue_bast(r, lkb, lkb->lkb_rqmode);
2548 lkb->lkb_highbast = lkb->lkb_rqmode;
2552 lkb->lkb_id, lkb->lkb_nodeid,
2559 hi = max_t(int, lkb->lkb_rqmode, hi);
2561 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2578 struct dlm_lkb *lkb, *s;
2580 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2581 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2582 grant_lock_pending(r, lkb);
2586 high = max_t(int, lkb->lkb_rqmode, high);
2587 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2616 struct dlm_lkb *lkb, *s;
2638 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2639 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2641 lkb->lkb_grmode == DLM_LOCK_PR)
2642 queue_bast(r, lkb, DLM_LOCK_CW);
2644 queue_bast(r, lkb, high);
2645 lkb->lkb_highbast = high;
2665 struct dlm_lkb *lkb)
2671 if (gr == lkb)
2673 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2674 queue_bast(r, gr, lkb->lkb_rqmode);
2675 gr->lkb_highbast = lkb->lkb_rqmode;
2680 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2682 send_bast_queue(r, &r->res_grantqueue, lkb);
2685 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2687 send_bast_queue(r, &r->res_grantqueue, lkb);
2688 send_bast_queue(r, &r->res_convertqueue, lkb);
2691 /* set_master(r, lkb) -- set the master nodeid of a resource
2694 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2695 known, it can just be copied to the lkb and the function will return
2697 before it can be copied to the lkb.
2699 When the rsb nodeid is being looked up remotely, the initial lkb
2701 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2705 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2706 1: the rsb master is not available and the lkb has been placed on
2710 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2716 r->res_first_lkid = lkb->lkb_id;
2717 lkb->lkb_nodeid = r->res_nodeid;
2721 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2722 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2727 lkb->lkb_nodeid = 0;
2732 lkb->lkb_nodeid = r->res_master_nodeid;
2744 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2748 lkb->lkb_nodeid = 0;
2754 r->res_first_lkid = lkb->lkb_id;
2755 send_lookup(r, lkb);
2761 struct dlm_lkb *lkb, *safe;
2763 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2764 list_del_init(&lkb->lkb_rsb_lookup);
2765 _request_lock(r, lkb);
2774 struct dlm_lkb *lkb;
2791 lkb the first_lkid */
2796 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2798 list_del_init(&lkb->lkb_rsb_lookup);
2799 r->res_first_lkid = lkb->lkb_id;
2800 _request_lock(r, lkb);
2859 /* these args will be copied to the lkb in validate_lock_args,
2861 an active lkb cannot be modified before locking the rsb */
2889 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2895 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2898 if (lkb->lkb_wait_type)
2901 if (is_overlap(lkb))
2905 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2909 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2913 lkb->lkb_exflags = args->flags;
2914 lkb->lkb_sbflags = 0;
2915 lkb->lkb_astfn = args->astfn;
2916 lkb->lkb_astparam = args->astparam;
2917 lkb->lkb_bastfn = args->bastfn;
2918 lkb->lkb_rqmode = args->mode;
2919 lkb->lkb_lksb = args->lksb;
2920 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2921 lkb->lkb_ownpid = (int) current->pid;
2922 lkb->lkb_timeout_cs = args->timeout;
2927 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2928 lkb->lkb_status, lkb->lkb_wait_type,
2929 lkb->lkb_resource->res_name);
2940 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2942 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2945 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2946 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2947 dlm_print_lkb(lkb);
2951 /* an lkb may still exist even though the lock is EOL'ed due to a
2955 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2956 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2961 /* an lkb may be waiting for an rsb lookup to complete where the
2964 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2966 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2967 list_del_init(&lkb->lkb_rsb_lookup);
2968 queue_cast(lkb->lkb_resource, lkb,
2971 unhold_lkb(lkb); /* undoes create_lkb() */
2981 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2984 if (is_overlap(lkb))
2988 del_timeout(lkb);
2990 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2991 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2997 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2998 !lkb->lkb_wait_type) {
3003 switch (lkb->lkb_wait_type) {
3006 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3022 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3025 if (is_overlap_unlock(lkb))
3029 del_timeout(lkb);
3031 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3032 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3037 switch (lkb->lkb_wait_type) {
3040 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3052 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3057 lkb->lkb_exflags |= args->flags;
3058 lkb->lkb_sbflags = 0;
3059 lkb->lkb_astparam = args->astparam;
3064 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3065 args->flags, lkb->lkb_wait_type,
3066 lkb->lkb_resource->res_name);
3077 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3081 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3082 grant_lock(r, lkb);
3083 queue_cast(r, lkb, 0);
3087 if (can_be_queued(lkb)) {
3089 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3090 add_timeout(lkb);
3095 queue_cast(r, lkb, -EAGAIN);
3100 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3105 if (force_blocking_asts(lkb))
3106 send_blocking_asts_all(r, lkb);
3109 send_blocking_asts(r, lkb);
3114 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3121 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3122 grant_lock(r, lkb);
3123 queue_cast(r, lkb, 0);
3131 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3133 revert_lock(r, lkb);
3134 queue_cast(r, lkb, -EDEADLK);
3145 if (is_demoted(lkb)) {
3147 if (_can_be_granted(r, lkb, 1, 0)) {
3148 grant_lock(r, lkb);
3149 queue_cast(r, lkb, 0);
3155 if (can_be_queued(lkb)) {
3157 del_lkb(r, lkb);
3158 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3159 add_timeout(lkb);
3164 queue_cast(r, lkb, -EAGAIN);
3169 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3178 if (force_blocking_asts(lkb))
3179 send_blocking_asts_all(r, lkb);
3182 send_blocking_asts(r, lkb);
3187 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3189 remove_lock(r, lkb);
3190 queue_cast(r, lkb, -DLM_EUNLOCK);
3194 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3202 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3206 error = revert_lock(r, lkb);
3208 queue_cast(r, lkb, -DLM_ECANCEL);
3214 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3226 /* add a new lkb to a possibly new rsb, called by requesting process */
3228 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3232 /* set_master: sets lkb nodeid from r */
3234 error = set_master(r, lkb);
3244 error = send_request(r, lkb);
3246 error = do_request(r, lkb);
3249 do_request_effects(r, lkb, error);
3255 /* change some property of an existing lkb, e.g. mode */
3257 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3263 error = send_convert(r, lkb);
3265 error = do_convert(r, lkb);
3268 do_convert_effects(r, lkb, error);
3274 /* remove an existing lkb from the granted queue */
3276 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3282 error = send_unlock(r, lkb);
3284 error = do_unlock(r, lkb);
3287 do_unlock_effects(r, lkb, error);
3293 /* remove an existing lkb from the convert or wait queue */
3295 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3301 error = send_cancel(r, lkb);
3303 error = do_cancel(r, lkb);
3306 do_cancel_effects(r, lkb, error);
3317 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3323 error = validate_lock_args(ls, lkb, args);
3333 attach_lkb(r, lkb);
3334 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3336 error = _request_lock(r, lkb);
3343 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3349 r = lkb->lkb_resource;
3354 error = validate_lock_args(ls, lkb, args);
3358 error = _convert_lock(r, lkb);
3365 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3371 r = lkb->lkb_resource;
3376 error = validate_unlock_args(lkb, args);
3380 error = _unlock_lock(r, lkb);
3387 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3393 r = lkb->lkb_resource;
3398 error = validate_unlock_args(lkb, args);
3402 error = _cancel_lock(r, lkb);
3425 struct dlm_lkb *lkb;
3436 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3438 error = create_lkb(ls, &lkb);
3449 error = convert_lock(ls, lkb, &args);
3451 error = request_lock(ls, lkb, name, namelen, &args);
3457 __put_lkb(ls, lkb);
3473 struct dlm_lkb *lkb;
3483 error = find_lkb(ls, lkid, &lkb);
3492 error = cancel_lock(ls, lkb, &args);
3494 error = unlock_lock(ls, lkb, &args);
3501 dlm_put_lkb(lkb);
3564 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3582 if (lkb && lkb->lkb_lvbptr)
3601 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3604 ms->m_nodeid = lkb->lkb_nodeid;
3605 ms->m_pid = lkb->lkb_ownpid;
3606 ms->m_lkid = lkb->lkb_id;
3607 ms->m_remid = lkb->lkb_remid;
3608 ms->m_exflags = lkb->lkb_exflags;
3609 ms->m_sbflags = lkb->lkb_sbflags;
3610 ms->m_flags = lkb->lkb_flags;
3611 ms->m_lvbseq = lkb->lkb_lvbseq;
3612 ms->m_status = lkb->lkb_status;
3613 ms->m_grmode = lkb->lkb_grmode;
3614 ms->m_rqmode = lkb->lkb_rqmode;
3618 not from lkb fields */
3620 if (lkb->lkb_bastfn)
3622 if (lkb->lkb_astfn)
3638 if (!lkb->lkb_lvbptr)
3640 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3645 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3653 error = add_to_waiters(lkb, mstype, to_nodeid);
3657 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3661 send_args(r, lkb, ms);
3669 remove_from_waiters(lkb, msg_reply_type(mstype));
3673 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3675 return send_common(r, lkb, DLM_MSG_REQUEST);
3678 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3682 error = send_common(r, lkb, DLM_MSG_CONVERT);
3685 if (!error && down_conversion(lkb)) {
3686 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3690 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3696 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3700 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3702 return send_common(r, lkb, DLM_MSG_UNLOCK);
3705 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3707 return send_common(r, lkb, DLM_MSG_CANCEL);
3710 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3716 to_nodeid = lkb->lkb_nodeid;
3718 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3722 send_args(r, lkb, ms);
3731 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3737 to_nodeid = lkb->lkb_nodeid;
3743 send_args(r, lkb, ms);
3752 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3760 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3768 send_args(r, lkb, ms);
3776 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3800 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3807 to_nodeid = lkb->lkb_nodeid;
3809 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3813 send_args(r, lkb, ms);
3822 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3827 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3829 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3832 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3834 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3837 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3839 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3865 the lkb for any type of message */
3867 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3869 lkb->lkb_exflags = ms->m_exflags;
3870 lkb->lkb_sbflags = ms->m_sbflags;
3871 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3875 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3880 lkb->lkb_sbflags = ms->m_sbflags;
3881 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3890 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3895 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3896 if (!lkb->lkb_lvbptr)
3897 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3898 if (!lkb->lkb_lvbptr)
3903 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3918 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3921 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3922 lkb->lkb_ownpid = ms->m_pid;
3923 lkb->lkb_remid = ms->m_lkid;
3924 lkb->lkb_grmode = DLM_LOCK_IV;
3925 lkb->lkb_rqmode = ms->m_rqmode;
3927 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3928 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3930 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3931 /* lkb was just created so there won't be an lvb yet */
3932 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3933 if (!lkb->lkb_lvbptr)
3940 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3943 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3946 if (receive_lvb(ls, lkb, ms))
3949 lkb->lkb_rqmode = ms->m_rqmode;
3950 lkb->lkb_lvbseq = ms->m_lvbseq;
3955 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3958 if (receive_lvb(ls, lkb, ms))
3963 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3968 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3969 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3970 lkb->lkb_remid = ms->m_lkid;
3974 fields in the lkb. */
3976 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3982 if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) {
3983 log_error(lkb->lkb_resource->res_ls,
3993 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
4002 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
4007 if (!is_process_copy(lkb))
4009 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4019 log_error(lkb->lkb_resource->res_ls,
4021 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
4022 lkb->lkb_flags, lkb->lkb_nodeid);
4087 struct dlm_lkb *lkb;
4094 error = create_lkb(ls, &lkb);
4098 receive_flags(lkb, ms);
4099 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4100 error = receive_request_args(ls, lkb, ms);
4102 __put_lkb(ls, lkb);
4117 __put_lkb(ls, lkb);
4128 __put_lkb(ls, lkb);
4133 attach_lkb(r, lkb);
4134 error = do_request(r, lkb);
4135 send_request_reply(r, lkb, error);
4136 do_request_effects(r, lkb, error);
4144 dlm_put_lkb(lkb);
4148 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4180 struct dlm_lkb *lkb;
4184 error = find_lkb(ls, ms->m_remid, &lkb);
4188 if (lkb->lkb_remid != ms->m_lkid) {
4190 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4191 (unsigned long long)lkb->lkb_recover_seq,
4194 dlm_put_lkb(lkb);
4198 r = lkb->lkb_resource;
4203 error = validate_message(lkb, ms);
4207 receive_flags(lkb, ms);
4209 error = receive_convert_args(ls, lkb, ms);
4211 send_convert_reply(r, lkb, error);
4215 reply = !down_conversion(lkb);
4217 error = do_convert(r, lkb);
4219 send_convert_reply(r, lkb, error);
4220 do_convert_effects(r, lkb, error);
4224 dlm_put_lkb(lkb);
4235 struct dlm_lkb *lkb;
4239 error = find_lkb(ls, ms->m_remid, &lkb);
4243 if (lkb->lkb_remid != ms->m_lkid) {
4245 lkb->lkb_id, lkb->lkb_remid,
4248 dlm_put_lkb(lkb);
4252 r = lkb->lkb_resource;
4257 error = validate_message(lkb, ms);
4261 receive_flags(lkb, ms);
4263 error = receive_unlock_args(ls, lkb, ms);
4265 send_unlock_reply(r, lkb, error);
4269 error = do_unlock(r, lkb);
4270 send_unlock_reply(r, lkb, error);
4271 do_unlock_effects(r, lkb, error);
4275 dlm_put_lkb(lkb);
4286 struct dlm_lkb *lkb;
4290 error = find_lkb(ls, ms->m_remid, &lkb);
4294 receive_flags(lkb, ms);
4296 r = lkb->lkb_resource;
4301 error = validate_message(lkb, ms);
4305 error = do_cancel(r, lkb);
4306 send_cancel_reply(r, lkb, error);
4307 do_cancel_effects(r, lkb, error);
4311 dlm_put_lkb(lkb);
4322 struct dlm_lkb *lkb;
4326 error = find_lkb(ls, ms->m_remid, &lkb);
4330 r = lkb->lkb_resource;
4335 error = validate_message(lkb, ms);
4339 receive_flags_reply(lkb, ms);
4340 if (is_altmode(lkb))
4341 munge_altmode(lkb, ms);
4342 grant_lock_pc(r, lkb, ms);
4343 queue_cast(r, lkb, 0);
4347 dlm_put_lkb(lkb);
4353 struct dlm_lkb *lkb;
4357 error = find_lkb(ls, ms->m_remid, &lkb);
4361 r = lkb->lkb_resource;
4366 error = validate_message(lkb, ms);
4370 queue_bast(r, lkb, ms->m_bastmode);
4371 lkb->lkb_highbast = ms->m_bastmode;
4375 dlm_put_lkb(lkb);
4494 struct dlm_lkb *lkb;
4499 error = find_lkb(ls, ms->m_remid, &lkb);
4503 r = lkb->lkb_resource;
4507 error = validate_message(lkb, ms);
4511 mstype = lkb->lkb_wait_type;
4512 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4515 lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4525 lkb->lkb_nodeid = from_nodeid;
4534 queue_cast(r, lkb, -EAGAIN);
4536 unhold_lkb(lkb); /* undoes create_lkb() */
4542 receive_flags_reply(lkb, ms);
4543 lkb->lkb_remid = ms->m_lkid;
4544 if (is_altmode(lkb))
4545 munge_altmode(lkb, ms);
4547 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4548 add_timeout(lkb);
4550 grant_lock_pc(r, lkb, ms);
4551 queue_cast(r, lkb, 0);
4560 "master %d dir %d first %x %s", lkb->lkb_id,
4569 lkb->lkb_nodeid = -1;
4572 if (is_overlap(lkb)) {
4574 queue_cast_overlap(r, lkb);
4576 unhold_lkb(lkb); /* undoes create_lkb() */
4578 _request_lock(r, lkb);
4587 lkb->lkb_id, result);
4590 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4592 lkb->lkb_id, result);
4593 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4594 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4595 send_unlock(r, lkb);
4596 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4597 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4598 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4599 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4600 send_cancel(r, lkb);
4602 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4603 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4608 dlm_put_lkb(lkb);
4612 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4619 queue_cast(r, lkb, -EAGAIN);
4623 receive_flags_reply(lkb, ms);
4624 revert_lock_pc(r, lkb);
4625 queue_cast(r, lkb, -EDEADLK);
4630 receive_flags_reply(lkb, ms);
4631 if (is_demoted(lkb))
4632 munge_demoted(lkb);
4633 del_lkb(r, lkb);
4634 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4635 add_timeout(lkb);
4640 receive_flags_reply(lkb, ms);
4641 if (is_demoted(lkb))
4642 munge_demoted(lkb);
4643 grant_lock_pc(r, lkb, ms);
4644 queue_cast(r, lkb, 0);
4649 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4652 dlm_print_lkb(lkb);
4656 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4658 struct dlm_rsb *r = lkb->lkb_resource;
4664 error = validate_message(lkb, ms);
4669 error = remove_from_waiters_ms(lkb, ms);
4673 __receive_convert_reply(r, lkb, ms);
4681 struct dlm_lkb *lkb;
4684 error = find_lkb(ls, ms->m_remid, &lkb);
4688 _receive_convert_reply(lkb, ms);
4689 dlm_put_lkb(lkb);
4693 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4695 struct dlm_rsb *r = lkb->lkb_resource;
4701 error = validate_message(lkb, ms);
4706 error = remove_from_waiters_ms(lkb, ms);
4714 receive_flags_reply(lkb, ms);
4715 remove_lock_pc(r, lkb);
4716 queue_cast(r, lkb, -DLM_EUNLOCK);
4722 lkb->lkb_id, ms->m_result);
4731 struct dlm_lkb *lkb;
4734 error = find_lkb(ls, ms->m_remid, &lkb);
4738 _receive_unlock_reply(lkb, ms);
4739 dlm_put_lkb(lkb);
4743 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4745 struct dlm_rsb *r = lkb->lkb_resource;
4751 error = validate_message(lkb, ms);
4756 error = remove_from_waiters_ms(lkb, ms);
4764 receive_flags_reply(lkb, ms);
4765 revert_lock_pc(r, lkb);
4766 queue_cast(r, lkb, -DLM_ECANCEL);
4772 lkb->lkb_id, ms->m_result);
4781 struct dlm_lkb *lkb;
4784 error = find_lkb(ls, ms->m_remid, &lkb);
4788 _receive_cancel_reply(lkb, ms);
4789 dlm_put_lkb(lkb);
4795 struct dlm_lkb *lkb;
4800 error = find_lkb(ls, ms->m_lkid, &lkb);
4809 r = lkb->lkb_resource;
4813 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4829 lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4842 lkb->lkb_id, ms->m_header.h_nodeid);
4845 lkb->lkb_nodeid = -1;
4852 if (is_overlap(lkb)) {
4854 lkb->lkb_id, lkb->lkb_flags);
4855 queue_cast_overlap(r, lkb);
4856 unhold_lkb(lkb); /* undoes create_lkb() */
4860 _request_lock(r, lkb);
4868 dlm_put_lkb(lkb);
5086 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5089 if (middle_conversion(lkb)) {
5090 hold_lkb(lkb);
5095 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5096 _receive_convert_reply(lkb, ms_stub);
5099 lkb->lkb_grmode = DLM_LOCK_IV;
5100 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5101 unhold_lkb(lkb);
5103 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5104 lkb->lkb_flags |= DLM_IFL_RESEND;
5107 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5111 /* A waiting lkb needs recovery if the master node has failed, or
5114 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5120 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5134 struct dlm_lkb *lkb, *safe;
5145 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5147 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5152 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5155 lkb->lkb_id,
5156 lkb->lkb_remid,
5157 lkb->lkb_wait_type,
5158 lkb->lkb_resource->res_nodeid,
5159 lkb->lkb_nodeid,
5160 lkb->lkb_wait_nodeid,
5167 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5168 lkb->lkb_flags |= DLM_IFL_RESEND;
5172 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5175 wait_type = lkb->lkb_wait_type;
5185 if (is_overlap_cancel(lkb)) {
5187 if (lkb->lkb_grmode == DLM_LOCK_IV)
5190 if (is_overlap_unlock(lkb)) {
5192 if (lkb->lkb_grmode == DLM_LOCK_IV)
5197 lkb->lkb_id, lkb->lkb_flags, wait_type,
5204 lkb->lkb_flags |= DLM_IFL_RESEND;
5208 recover_convert_waiter(ls, lkb, ms_stub);
5212 hold_lkb(lkb);
5217 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5218 _receive_unlock_reply(lkb, ms_stub);
5219 dlm_put_lkb(lkb);
5223 hold_lkb(lkb);
5228 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5229 _receive_cancel_reply(lkb, ms_stub);
5230 dlm_put_lkb(lkb);
5234 log_error(ls, "invalid lkb wait_type %d %d",
5235 lkb->lkb_wait_type, wait_type);
5245 struct dlm_lkb *lkb = NULL, *iter;
5251 lkb = iter;
5257 return lkb;
5260 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
5261 master or dir-node for r. Processing the lkb may result in it being placed
5271 recovery. if before, the lkb may still have a pos wait_count; if after, the
5278 struct dlm_lkb *lkb;
5289 lkb = find_resend_waiter(ls);
5290 if (!lkb)
5293 r = lkb->lkb_resource;
5297 mstype = lkb->lkb_wait_type;
5298 oc = is_overlap_cancel(lkb);
5299 ou = is_overlap_unlock(lkb);
5304 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5305 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5312 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5313 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5314 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5315 lkb->lkb_wait_type = 0;
5319 while (lkb->lkb_wait_count) {
5320 lkb->lkb_wait_count--;
5321 unhold_lkb(lkb);
5324 list_del_init(&lkb->lkb_wait_reply);
5332 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5334 unhold_lkb(lkb); /* undoes create_lkb() */
5338 queue_cast(r, lkb, -DLM_ECANCEL);
5340 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5341 _unlock_lock(r, lkb);
5351 _request_lock(r, lkb);
5356 _convert_lock(r, lkb);
5366 lkb->lkb_id, mstype, r->res_nodeid,
5371 dlm_put_lkb(lkb);
5380 struct dlm_lkb *lkb, *safe;
5382 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5383 if (!is_master_copy(lkb))
5389 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5392 del_lkb(r, lkb);
5394 /* this put should free the lkb */
5395 if (!dlm_put_lkb(lkb))
5396 log_error(ls, "purged mstcpy lkb not released");
5413 struct dlm_lkb *lkb, *safe;
5415 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5416 if (!is_master_copy(lkb))
5419 if ((lkb->lkb_nodeid == nodeid_gone) ||
5420 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5424 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5425 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5429 del_lkb(r, lkb);
5431 /* this put should free the lkb */
5432 if (!dlm_put_lkb(lkb))
5433 log_error(ls, "purged dead lkb not released");
5514 * we are interested in are those with lkb's on either the convert or
5563 struct dlm_lkb *lkb;
5565 list_for_each_entry(lkb, head, lkb_statequeue) {
5566 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5567 return lkb;
5575 struct dlm_lkb *lkb;
5577 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5578 if (lkb)
5579 return lkb;
5580 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5581 if (lkb)
5582 return lkb;
5583 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5584 if (lkb)
5585 return lkb;
5590 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5595 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5596 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5597 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5598 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5599 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5600 lkb->lkb_flags |= DLM_IFL_MSTCPY;
5601 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5602 lkb->lkb_rqmode = rl->rl_rqmode;
5603 lkb->lkb_grmode = rl->rl_grmode;
5606 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5607 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5609 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5614 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5615 if (!lkb->lkb_lvbptr)
5617 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5625 middle_conversion(lkb)) {
5627 lkb->lkb_grmode = DLM_LOCK_IV;
5634 /* This lkb may have been recovered in a previous aborted recovery so we need
5635 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5636 If so we just send back a standard reply. If not, we create a new lkb with
5645 struct dlm_lkb *lkb;
5679 lkb = search_remid(r, from_nodeid, remid);
5680 if (lkb) {
5685 error = create_lkb(ls, &lkb);
5689 error = receive_rcom_lock_args(ls, lkb, r, rc);
5691 __put_lkb(ls, lkb);
5695 attach_lkb(r, lkb);
5696 add_lkb(r, lkb, rl->rl_status);
5705 saving in its process-copy lkb */
5706 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5708 lkb->lkb_recover_seq = ls->ls_recover_seq;
5726 struct dlm_lkb *lkb;
5734 error = find_lkb(ls, lkid, &lkb);
5741 r = lkb->lkb_resource;
5745 if (!is_process_copy(lkb)) {
5751 dlm_put_lkb(lkb);
5764 dlm_send_rcom_lock(r, lkb);
5768 lkb->lkb_remid = remid;
5781 dlm_put_lkb(lkb);
5790 struct dlm_lkb *lkb;
5796 error = create_lkb(ls, &lkb);
5806 __put_lkb(ls, lkb);
5817 __put_lkb(ls, lkb);
5821 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5824 lkb->lkb_flags |= DLM_IFL_USER;
5825 error = request_lock(ls, lkb, name, namelen, &args);
5837 __put_lkb(ls, lkb);
5841 /* add this new lkb to the per-process list of locks */
5843 hold_lkb(lkb);
5844 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5855 struct dlm_lkb *lkb;
5862 error = find_lkb(ls, lkid, &lkb);
5869 ua = lkb->lkb_ua;
5893 error = convert_lock(ls, lkb, &args);
5898 dlm_put_lkb(lkb);
5915 struct dlm_lkb *lkb = NULL, *iter;
5931 lkb = iter;
5939 if (!lkb && found_other_mode) {
5944 if (!lkb) {
5949 lkb->lkb_exflags = flags;
5950 lkb->lkb_ownpid = (int) current->pid;
5952 ua = lkb->lkb_ua;
5963 * The lkb reference from the ls_orphans list was not
5969 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5979 struct dlm_lkb *lkb;
5986 error = find_lkb(ls, lkid, &lkb);
5990 ua = lkb->lkb_ua;
6002 error = unlock_lock(ls, lkb, &args);
6013 /* dlm_user_add_cb() may have already taken lkb off the proc list */
6014 if (!list_empty(&lkb->lkb_ownqueue))
6015 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6018 dlm_put_lkb(lkb);
6028 struct dlm_lkb *lkb;
6035 error = find_lkb(ls, lkid, &lkb);
6039 ua = lkb->lkb_ua;
6048 error = cancel_lock(ls, lkb, &args);
6056 dlm_put_lkb(lkb);
6065 struct dlm_lkb *lkb;
6073 error = find_lkb(ls, lkid, &lkb);
6077 ua = lkb->lkb_ua;
6085 r = lkb->lkb_resource;
6089 error = validate_unlock_args(lkb, &args);
6092 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6094 error = _cancel_lock(r, lkb);
6105 dlm_put_lkb(lkb);
6111 /* lkb's that are removed from the waiters list by revert are just left on the
6114 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6119 hold_lkb(lkb); /* reference for the ls_orphans list */
6121 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6124 set_unlock_args(0, lkb->lkb_ua, &args);
6126 error = cancel_lock(ls, lkb, &args);
6132 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6137 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6143 lkb->lkb_ua, &args);
6145 error = unlock_lock(ls, lkb, &args);
6158 struct dlm_lkb *lkb = NULL;
6164 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6165 list_del_init(&lkb->lkb_ownqueue);
6167 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6168 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6170 lkb->lkb_flags |= DLM_IFL_DEAD;
6173 return lkb;
6177 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6181 list, and no more device_writes should add lkb's to proc->locks list; so we
6188 struct dlm_lkb *lkb, *safe;
6193 lkb = del_proc_lock(ls, proc);
6194 if (!lkb)
6196 del_timeout(lkb);
6197 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6198 orphan_proc_lock(ls, lkb);
6200 unlock_proc_lock(ls, lkb);
6203 added by dlm_user_request, it may result in the lkb
6206 dlm_put_lkb(lkb);
6212 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213 list_del_init(&lkb->lkb_ownqueue);
6214 lkb->lkb_flags |= DLM_IFL_DEAD;
6215 dlm_put_lkb(lkb);
6218 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6219 memset(&lkb->lkb_callbacks, 0,
6221 list_del_init(&lkb->lkb_cb_list);
6222 dlm_put_lkb(lkb);
6231 struct dlm_lkb *lkb, *safe;
6234 lkb = NULL;
6237 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6239 list_del_init(&lkb->lkb_ownqueue);
6243 if (!lkb)
6246 lkb->lkb_flags |= DLM_IFL_DEAD;
6247 unlock_proc_lock(ls, lkb);
6248 dlm_put_lkb(lkb); /* ref from proc->locks list */
6252 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6253 list_del_init(&lkb->lkb_ownqueue);
6254 lkb->lkb_flags |= DLM_IFL_DEAD;
6255 dlm_put_lkb(lkb);
6260 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6261 memset(&lkb->lkb_callbacks, 0,
6263 list_del_init(&lkb->lkb_cb_list);
6264 dlm_put_lkb(lkb);
6273 struct dlm_lkb *lkb, *safe;
6276 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6277 if (pid && lkb->lkb_ownpid != pid)
6279 unlock_proc_lock(ls, lkb);
6280 list_del_init(&lkb->lkb_ownqueue);
6281 dlm_put_lkb(lkb);