162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0-only 262306a36Sopenharmony_ci/****************************************************************************** 362306a36Sopenharmony_ci******************************************************************************* 462306a36Sopenharmony_ci** 562306a36Sopenharmony_ci** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. 662306a36Sopenharmony_ci** 762306a36Sopenharmony_ci** 862306a36Sopenharmony_ci******************************************************************************* 962306a36Sopenharmony_ci******************************************************************************/ 1062306a36Sopenharmony_ci 1162306a36Sopenharmony_ci#include "dlm_internal.h" 1262306a36Sopenharmony_ci#include "member.h" 1362306a36Sopenharmony_ci#include "lock.h" 1462306a36Sopenharmony_ci#include "dir.h" 1562306a36Sopenharmony_ci#include "config.h" 1662306a36Sopenharmony_ci#include "requestqueue.h" 1762306a36Sopenharmony_ci#include "util.h" 1862306a36Sopenharmony_ci 1962306a36Sopenharmony_cistruct rq_entry { 2062306a36Sopenharmony_ci struct list_head list; 2162306a36Sopenharmony_ci uint32_t recover_seq; 2262306a36Sopenharmony_ci int nodeid; 2362306a36Sopenharmony_ci struct dlm_message request; 2462306a36Sopenharmony_ci}; 2562306a36Sopenharmony_ci 2662306a36Sopenharmony_ci/* 2762306a36Sopenharmony_ci * Requests received while the lockspace is in recovery get added to the 2862306a36Sopenharmony_ci * request queue and processed when recovery is complete. This happens when 2962306a36Sopenharmony_ci * the lockspace is suspended on some nodes before it is on others, or the 3062306a36Sopenharmony_ci * lockspace is enabled on some while still suspended on others. 3162306a36Sopenharmony_ci */ 3262306a36Sopenharmony_ci 3362306a36Sopenharmony_civoid dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, 3462306a36Sopenharmony_ci const struct dlm_message *ms) 3562306a36Sopenharmony_ci{ 3662306a36Sopenharmony_ci struct rq_entry *e; 3762306a36Sopenharmony_ci int length = le16_to_cpu(ms->m_header.h_length) - 3862306a36Sopenharmony_ci sizeof(struct dlm_message); 3962306a36Sopenharmony_ci 4062306a36Sopenharmony_ci e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS); 4162306a36Sopenharmony_ci if (!e) { 4262306a36Sopenharmony_ci log_print("dlm_add_requestqueue: out of memory len %d", length); 4362306a36Sopenharmony_ci return; 4462306a36Sopenharmony_ci } 4562306a36Sopenharmony_ci 4662306a36Sopenharmony_ci e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF; 4762306a36Sopenharmony_ci e->nodeid = nodeid; 4862306a36Sopenharmony_ci memcpy(&e->request, ms, sizeof(*ms)); 4962306a36Sopenharmony_ci memcpy(&e->request.m_extra, ms->m_extra, length); 5062306a36Sopenharmony_ci 5162306a36Sopenharmony_ci atomic_inc(&ls->ls_requestqueue_cnt); 5262306a36Sopenharmony_ci mutex_lock(&ls->ls_requestqueue_mutex); 5362306a36Sopenharmony_ci list_add_tail(&e->list, &ls->ls_requestqueue); 5462306a36Sopenharmony_ci mutex_unlock(&ls->ls_requestqueue_mutex); 5562306a36Sopenharmony_ci} 5662306a36Sopenharmony_ci 5762306a36Sopenharmony_ci/* 5862306a36Sopenharmony_ci * Called by dlm_recoverd to process normal messages saved while recovery was 5962306a36Sopenharmony_ci * happening. Normal locking has been enabled before this is called. dlm_recv 6062306a36Sopenharmony_ci * upon receiving a message, will wait for all saved messages to be drained 6162306a36Sopenharmony_ci * here before processing the message it got. If a new dlm_ls_stop() arrives 6262306a36Sopenharmony_ci * while we're processing these saved messages, it may block trying to suspend 6362306a36Sopenharmony_ci * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that 6462306a36Sopenharmony_ci * case, we don't abort since locking_stopped is still 0. If dlm_recv is not 6562306a36Sopenharmony_ci * waiting for us, then this processing may be aborted due to locking_stopped. 6662306a36Sopenharmony_ci */ 6762306a36Sopenharmony_ci 6862306a36Sopenharmony_ciint dlm_process_requestqueue(struct dlm_ls *ls) 6962306a36Sopenharmony_ci{ 7062306a36Sopenharmony_ci struct rq_entry *e; 7162306a36Sopenharmony_ci struct dlm_message *ms; 7262306a36Sopenharmony_ci int error = 0; 7362306a36Sopenharmony_ci 7462306a36Sopenharmony_ci mutex_lock(&ls->ls_requestqueue_mutex); 7562306a36Sopenharmony_ci 7662306a36Sopenharmony_ci for (;;) { 7762306a36Sopenharmony_ci if (list_empty(&ls->ls_requestqueue)) { 7862306a36Sopenharmony_ci mutex_unlock(&ls->ls_requestqueue_mutex); 7962306a36Sopenharmony_ci error = 0; 8062306a36Sopenharmony_ci break; 8162306a36Sopenharmony_ci } 8262306a36Sopenharmony_ci e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 8362306a36Sopenharmony_ci mutex_unlock(&ls->ls_requestqueue_mutex); 8462306a36Sopenharmony_ci 8562306a36Sopenharmony_ci ms = &e->request; 8662306a36Sopenharmony_ci 8762306a36Sopenharmony_ci log_limit(ls, "dlm_process_requestqueue msg %d from %d " 8862306a36Sopenharmony_ci "lkid %x remid %x result %d seq %u", 8962306a36Sopenharmony_ci le32_to_cpu(ms->m_type), 9062306a36Sopenharmony_ci le32_to_cpu(ms->m_header.h_nodeid), 9162306a36Sopenharmony_ci le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 9262306a36Sopenharmony_ci from_dlm_errno(le32_to_cpu(ms->m_result)), 9362306a36Sopenharmony_ci e->recover_seq); 9462306a36Sopenharmony_ci 9562306a36Sopenharmony_ci dlm_receive_message_saved(ls, &e->request, e->recover_seq); 9662306a36Sopenharmony_ci 9762306a36Sopenharmony_ci mutex_lock(&ls->ls_requestqueue_mutex); 9862306a36Sopenharmony_ci list_del(&e->list); 9962306a36Sopenharmony_ci if (atomic_dec_and_test(&ls->ls_requestqueue_cnt)) 10062306a36Sopenharmony_ci wake_up(&ls->ls_requestqueue_wait); 10162306a36Sopenharmony_ci kfree(e); 10262306a36Sopenharmony_ci 10362306a36Sopenharmony_ci if (dlm_locking_stopped(ls)) { 10462306a36Sopenharmony_ci log_debug(ls, "process_requestqueue abort running"); 10562306a36Sopenharmony_ci mutex_unlock(&ls->ls_requestqueue_mutex); 10662306a36Sopenharmony_ci error = -EINTR; 10762306a36Sopenharmony_ci break; 10862306a36Sopenharmony_ci } 10962306a36Sopenharmony_ci schedule(); 11062306a36Sopenharmony_ci } 11162306a36Sopenharmony_ci 11262306a36Sopenharmony_ci return error; 11362306a36Sopenharmony_ci} 11462306a36Sopenharmony_ci 11562306a36Sopenharmony_ci/* 11662306a36Sopenharmony_ci * After recovery is done, locking is resumed and dlm_recoverd takes all the 11762306a36Sopenharmony_ci * saved requests and processes them as they would have been by dlm_recv. At 11862306a36Sopenharmony_ci * the same time, dlm_recv will start receiving new requests from remote nodes. 11962306a36Sopenharmony_ci * We want to delay dlm_recv processing new requests until dlm_recoverd has 12062306a36Sopenharmony_ci * finished processing the old saved requests. We don't check for locking 12162306a36Sopenharmony_ci * stopped here because dlm_ls_stop won't stop locking until it's suspended us 12262306a36Sopenharmony_ci * (dlm_recv). 12362306a36Sopenharmony_ci */ 12462306a36Sopenharmony_ci 12562306a36Sopenharmony_civoid dlm_wait_requestqueue(struct dlm_ls *ls) 12662306a36Sopenharmony_ci{ 12762306a36Sopenharmony_ci wait_event(ls->ls_requestqueue_wait, 12862306a36Sopenharmony_ci atomic_read(&ls->ls_requestqueue_cnt) == 0); 12962306a36Sopenharmony_ci} 13062306a36Sopenharmony_ci 13162306a36Sopenharmony_cistatic int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) 13262306a36Sopenharmony_ci{ 13362306a36Sopenharmony_ci __le32 type = ms->m_type; 13462306a36Sopenharmony_ci 13562306a36Sopenharmony_ci /* the ls is being cleaned up and freed by release_lockspace */ 13662306a36Sopenharmony_ci if (!atomic_read(&ls->ls_count)) 13762306a36Sopenharmony_ci return 1; 13862306a36Sopenharmony_ci 13962306a36Sopenharmony_ci if (dlm_is_removed(ls, nodeid)) 14062306a36Sopenharmony_ci return 1; 14162306a36Sopenharmony_ci 14262306a36Sopenharmony_ci /* directory operations are always purged because the directory is 14362306a36Sopenharmony_ci always rebuilt during recovery and the lookups resent */ 14462306a36Sopenharmony_ci 14562306a36Sopenharmony_ci if (type == cpu_to_le32(DLM_MSG_REMOVE) || 14662306a36Sopenharmony_ci type == cpu_to_le32(DLM_MSG_LOOKUP) || 14762306a36Sopenharmony_ci type == cpu_to_le32(DLM_MSG_LOOKUP_REPLY)) 14862306a36Sopenharmony_ci return 1; 14962306a36Sopenharmony_ci 15062306a36Sopenharmony_ci if (!dlm_no_directory(ls)) 15162306a36Sopenharmony_ci return 0; 15262306a36Sopenharmony_ci 15362306a36Sopenharmony_ci return 1; 15462306a36Sopenharmony_ci} 15562306a36Sopenharmony_ci 15662306a36Sopenharmony_civoid dlm_purge_requestqueue(struct dlm_ls *ls) 15762306a36Sopenharmony_ci{ 15862306a36Sopenharmony_ci struct dlm_message *ms; 15962306a36Sopenharmony_ci struct rq_entry *e, *safe; 16062306a36Sopenharmony_ci 16162306a36Sopenharmony_ci mutex_lock(&ls->ls_requestqueue_mutex); 16262306a36Sopenharmony_ci list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { 16362306a36Sopenharmony_ci ms = &e->request; 16462306a36Sopenharmony_ci 16562306a36Sopenharmony_ci if (purge_request(ls, ms, e->nodeid)) { 16662306a36Sopenharmony_ci list_del(&e->list); 16762306a36Sopenharmony_ci if (atomic_dec_and_test(&ls->ls_requestqueue_cnt)) 16862306a36Sopenharmony_ci wake_up(&ls->ls_requestqueue_wait); 16962306a36Sopenharmony_ci kfree(e); 17062306a36Sopenharmony_ci } 17162306a36Sopenharmony_ci } 17262306a36Sopenharmony_ci mutex_unlock(&ls->ls_requestqueue_mutex); 17362306a36Sopenharmony_ci} 17462306a36Sopenharmony_ci 175