162306a36Sopenharmony_ci// SPDX-License-Identifier: GPL-2.0-only
262306a36Sopenharmony_ci/******************************************************************************
362306a36Sopenharmony_ci*******************************************************************************
462306a36Sopenharmony_ci**
562306a36Sopenharmony_ci**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
662306a36Sopenharmony_ci**
762306a36Sopenharmony_ci**
862306a36Sopenharmony_ci*******************************************************************************
962306a36Sopenharmony_ci******************************************************************************/
1062306a36Sopenharmony_ci
1162306a36Sopenharmony_ci#include "dlm_internal.h"
1262306a36Sopenharmony_ci#include "member.h"
1362306a36Sopenharmony_ci#include "lock.h"
1462306a36Sopenharmony_ci#include "dir.h"
1562306a36Sopenharmony_ci#include "config.h"
1662306a36Sopenharmony_ci#include "requestqueue.h"
1762306a36Sopenharmony_ci#include "util.h"
1862306a36Sopenharmony_ci
1962306a36Sopenharmony_cistruct rq_entry {
2062306a36Sopenharmony_ci	struct list_head list;
2162306a36Sopenharmony_ci	uint32_t recover_seq;
2262306a36Sopenharmony_ci	int nodeid;
2362306a36Sopenharmony_ci	struct dlm_message request;
2462306a36Sopenharmony_ci};
2562306a36Sopenharmony_ci
2662306a36Sopenharmony_ci/*
2762306a36Sopenharmony_ci * Requests received while the lockspace is in recovery get added to the
2862306a36Sopenharmony_ci * request queue and processed when recovery is complete.  This happens when
2962306a36Sopenharmony_ci * the lockspace is suspended on some nodes before it is on others, or the
3062306a36Sopenharmony_ci * lockspace is enabled on some while still suspended on others.
3162306a36Sopenharmony_ci */
3262306a36Sopenharmony_ci
3362306a36Sopenharmony_civoid dlm_add_requestqueue(struct dlm_ls *ls, int nodeid,
3462306a36Sopenharmony_ci			  const struct dlm_message *ms)
3562306a36Sopenharmony_ci{
3662306a36Sopenharmony_ci	struct rq_entry *e;
3762306a36Sopenharmony_ci	int length = le16_to_cpu(ms->m_header.h_length) -
3862306a36Sopenharmony_ci		sizeof(struct dlm_message);
3962306a36Sopenharmony_ci
4062306a36Sopenharmony_ci	e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
4162306a36Sopenharmony_ci	if (!e) {
4262306a36Sopenharmony_ci		log_print("dlm_add_requestqueue: out of memory len %d", length);
4362306a36Sopenharmony_ci		return;
4462306a36Sopenharmony_ci	}
4562306a36Sopenharmony_ci
4662306a36Sopenharmony_ci	e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
4762306a36Sopenharmony_ci	e->nodeid = nodeid;
4862306a36Sopenharmony_ci	memcpy(&e->request, ms, sizeof(*ms));
4962306a36Sopenharmony_ci	memcpy(&e->request.m_extra, ms->m_extra, length);
5062306a36Sopenharmony_ci
5162306a36Sopenharmony_ci	atomic_inc(&ls->ls_requestqueue_cnt);
5262306a36Sopenharmony_ci	mutex_lock(&ls->ls_requestqueue_mutex);
5362306a36Sopenharmony_ci	list_add_tail(&e->list, &ls->ls_requestqueue);
5462306a36Sopenharmony_ci	mutex_unlock(&ls->ls_requestqueue_mutex);
5562306a36Sopenharmony_ci}
5662306a36Sopenharmony_ci
5762306a36Sopenharmony_ci/*
5862306a36Sopenharmony_ci * Called by dlm_recoverd to process normal messages saved while recovery was
5962306a36Sopenharmony_ci * happening.  Normal locking has been enabled before this is called.  dlm_recv
6062306a36Sopenharmony_ci * upon receiving a message, will wait for all saved messages to be drained
6162306a36Sopenharmony_ci * here before processing the message it got.  If a new dlm_ls_stop() arrives
6262306a36Sopenharmony_ci * while we're processing these saved messages, it may block trying to suspend
6362306a36Sopenharmony_ci * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue.  In that
6462306a36Sopenharmony_ci * case, we don't abort since locking_stopped is still 0.  If dlm_recv is not
6562306a36Sopenharmony_ci * waiting for us, then this processing may be aborted due to locking_stopped.
6662306a36Sopenharmony_ci */
6762306a36Sopenharmony_ci
6862306a36Sopenharmony_ciint dlm_process_requestqueue(struct dlm_ls *ls)
6962306a36Sopenharmony_ci{
7062306a36Sopenharmony_ci	struct rq_entry *e;
7162306a36Sopenharmony_ci	struct dlm_message *ms;
7262306a36Sopenharmony_ci	int error = 0;
7362306a36Sopenharmony_ci
7462306a36Sopenharmony_ci	mutex_lock(&ls->ls_requestqueue_mutex);
7562306a36Sopenharmony_ci
7662306a36Sopenharmony_ci	for (;;) {
7762306a36Sopenharmony_ci		if (list_empty(&ls->ls_requestqueue)) {
7862306a36Sopenharmony_ci			mutex_unlock(&ls->ls_requestqueue_mutex);
7962306a36Sopenharmony_ci			error = 0;
8062306a36Sopenharmony_ci			break;
8162306a36Sopenharmony_ci		}
8262306a36Sopenharmony_ci		e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
8362306a36Sopenharmony_ci		mutex_unlock(&ls->ls_requestqueue_mutex);
8462306a36Sopenharmony_ci
8562306a36Sopenharmony_ci		ms = &e->request;
8662306a36Sopenharmony_ci
8762306a36Sopenharmony_ci		log_limit(ls, "dlm_process_requestqueue msg %d from %d "
8862306a36Sopenharmony_ci			  "lkid %x remid %x result %d seq %u",
8962306a36Sopenharmony_ci			  le32_to_cpu(ms->m_type),
9062306a36Sopenharmony_ci			  le32_to_cpu(ms->m_header.h_nodeid),
9162306a36Sopenharmony_ci			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
9262306a36Sopenharmony_ci			  from_dlm_errno(le32_to_cpu(ms->m_result)),
9362306a36Sopenharmony_ci			  e->recover_seq);
9462306a36Sopenharmony_ci
9562306a36Sopenharmony_ci		dlm_receive_message_saved(ls, &e->request, e->recover_seq);
9662306a36Sopenharmony_ci
9762306a36Sopenharmony_ci		mutex_lock(&ls->ls_requestqueue_mutex);
9862306a36Sopenharmony_ci		list_del(&e->list);
9962306a36Sopenharmony_ci		if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
10062306a36Sopenharmony_ci			wake_up(&ls->ls_requestqueue_wait);
10162306a36Sopenharmony_ci		kfree(e);
10262306a36Sopenharmony_ci
10362306a36Sopenharmony_ci		if (dlm_locking_stopped(ls)) {
10462306a36Sopenharmony_ci			log_debug(ls, "process_requestqueue abort running");
10562306a36Sopenharmony_ci			mutex_unlock(&ls->ls_requestqueue_mutex);
10662306a36Sopenharmony_ci			error = -EINTR;
10762306a36Sopenharmony_ci			break;
10862306a36Sopenharmony_ci		}
10962306a36Sopenharmony_ci		schedule();
11062306a36Sopenharmony_ci	}
11162306a36Sopenharmony_ci
11262306a36Sopenharmony_ci	return error;
11362306a36Sopenharmony_ci}
11462306a36Sopenharmony_ci
11562306a36Sopenharmony_ci/*
11662306a36Sopenharmony_ci * After recovery is done, locking is resumed and dlm_recoverd takes all the
11762306a36Sopenharmony_ci * saved requests and processes them as they would have been by dlm_recv.  At
11862306a36Sopenharmony_ci * the same time, dlm_recv will start receiving new requests from remote nodes.
11962306a36Sopenharmony_ci * We want to delay dlm_recv processing new requests until dlm_recoverd has
12062306a36Sopenharmony_ci * finished processing the old saved requests.  We don't check for locking
12162306a36Sopenharmony_ci * stopped here because dlm_ls_stop won't stop locking until it's suspended us
12262306a36Sopenharmony_ci * (dlm_recv).
12362306a36Sopenharmony_ci */
12462306a36Sopenharmony_ci
12562306a36Sopenharmony_civoid dlm_wait_requestqueue(struct dlm_ls *ls)
12662306a36Sopenharmony_ci{
12762306a36Sopenharmony_ci	wait_event(ls->ls_requestqueue_wait,
12862306a36Sopenharmony_ci		   atomic_read(&ls->ls_requestqueue_cnt) == 0);
12962306a36Sopenharmony_ci}
13062306a36Sopenharmony_ci
13162306a36Sopenharmony_cistatic int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
13262306a36Sopenharmony_ci{
13362306a36Sopenharmony_ci	__le32 type = ms->m_type;
13462306a36Sopenharmony_ci
13562306a36Sopenharmony_ci	/* the ls is being cleaned up and freed by release_lockspace */
13662306a36Sopenharmony_ci	if (!atomic_read(&ls->ls_count))
13762306a36Sopenharmony_ci		return 1;
13862306a36Sopenharmony_ci
13962306a36Sopenharmony_ci	if (dlm_is_removed(ls, nodeid))
14062306a36Sopenharmony_ci		return 1;
14162306a36Sopenharmony_ci
14262306a36Sopenharmony_ci	/* directory operations are always purged because the directory is
14362306a36Sopenharmony_ci	   always rebuilt during recovery and the lookups resent */
14462306a36Sopenharmony_ci
14562306a36Sopenharmony_ci	if (type == cpu_to_le32(DLM_MSG_REMOVE) ||
14662306a36Sopenharmony_ci	    type == cpu_to_le32(DLM_MSG_LOOKUP) ||
14762306a36Sopenharmony_ci	    type == cpu_to_le32(DLM_MSG_LOOKUP_REPLY))
14862306a36Sopenharmony_ci		return 1;
14962306a36Sopenharmony_ci
15062306a36Sopenharmony_ci	if (!dlm_no_directory(ls))
15162306a36Sopenharmony_ci		return 0;
15262306a36Sopenharmony_ci
15362306a36Sopenharmony_ci	return 1;
15462306a36Sopenharmony_ci}
15562306a36Sopenharmony_ci
15662306a36Sopenharmony_civoid dlm_purge_requestqueue(struct dlm_ls *ls)
15762306a36Sopenharmony_ci{
15862306a36Sopenharmony_ci	struct dlm_message *ms;
15962306a36Sopenharmony_ci	struct rq_entry *e, *safe;
16062306a36Sopenharmony_ci
16162306a36Sopenharmony_ci	mutex_lock(&ls->ls_requestqueue_mutex);
16262306a36Sopenharmony_ci	list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
16362306a36Sopenharmony_ci		ms =  &e->request;
16462306a36Sopenharmony_ci
16562306a36Sopenharmony_ci		if (purge_request(ls, ms, e->nodeid)) {
16662306a36Sopenharmony_ci			list_del(&e->list);
16762306a36Sopenharmony_ci			if (atomic_dec_and_test(&ls->ls_requestqueue_cnt))
16862306a36Sopenharmony_ci				wake_up(&ls->ls_requestqueue_wait);
16962306a36Sopenharmony_ci			kfree(e);
17062306a36Sopenharmony_ci		}
17162306a36Sopenharmony_ci	}
17262306a36Sopenharmony_ci	mutex_unlock(&ls->ls_requestqueue_mutex);
17362306a36Sopenharmony_ci}
17462306a36Sopenharmony_ci
175