1// SPDX-License-Identifier: GPL-2.0-or-later
2/*
3 * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
4 */
5
6#include <linux/kernel.h>
7#include <linux/sched.h>
8#include <linux/jiffies.h>
9#include <linux/module.h>
10#include <linux/fs.h>
11#include <linux/bio.h>
12#include <linux/blkdev.h>
13#include <linux/delay.h>
14#include <linux/file.h>
15#include <linux/kthread.h>
16#include <linux/configfs.h>
17#include <linux/random.h>
18#include <linux/crc32.h>
19#include <linux/time.h>
20#include <linux/debugfs.h>
21#include <linux/slab.h>
22#include <linux/bitmap.h>
23#include <linux/ktime.h>
24#include "heartbeat.h"
25#include "tcp.h"
26#include "nodemanager.h"
27#include "quorum.h"
28
29#include "masklog.h"
30
31
32/*
33 * The first heartbeat pass had one global thread that would serialize all hb
34 * callback calls.  This global serializing sem should only be removed once
35 * we've made sure that all callees can deal with being called concurrently
36 * from multiple hb region threads.
37 */
38static DECLARE_RWSEM(o2hb_callback_sem);
39
40/*
41 * multiple hb threads are watching multiple regions.  A node is live
42 * whenever any of the threads sees activity from the node in its region.
43 */
44static DEFINE_SPINLOCK(o2hb_live_lock);
45static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
46static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
47static LIST_HEAD(o2hb_node_events);
48static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
49
50/*
51 * In global heartbeat, we maintain a series of region bitmaps.
52 * 	- o2hb_region_bitmap allows us to limit the region number to max region.
53 * 	- o2hb_live_region_bitmap tracks live regions (seen steady iterations).
54 * 	- o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
55 * 		heartbeat on it.
56 * 	- o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
57 */
58static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
59static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
60static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
61static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
62
63#define O2HB_DB_TYPE_LIVENODES		0
64#define O2HB_DB_TYPE_LIVEREGIONS	1
65#define O2HB_DB_TYPE_QUORUMREGIONS	2
66#define O2HB_DB_TYPE_FAILEDREGIONS	3
67#define O2HB_DB_TYPE_REGION_LIVENODES	4
68#define O2HB_DB_TYPE_REGION_NUMBER	5
69#define O2HB_DB_TYPE_REGION_ELAPSED_TIME	6
70#define O2HB_DB_TYPE_REGION_PINNED	7
71struct o2hb_debug_buf {
72	int db_type;
73	int db_size;
74	int db_len;
75	void *db_data;
76};
77
78static struct o2hb_debug_buf *o2hb_db_livenodes;
79static struct o2hb_debug_buf *o2hb_db_liveregions;
80static struct o2hb_debug_buf *o2hb_db_quorumregions;
81static struct o2hb_debug_buf *o2hb_db_failedregions;
82
83#define O2HB_DEBUG_DIR			"o2hb"
84#define O2HB_DEBUG_LIVENODES		"livenodes"
85#define O2HB_DEBUG_LIVEREGIONS		"live_regions"
86#define O2HB_DEBUG_QUORUMREGIONS	"quorum_regions"
87#define O2HB_DEBUG_FAILEDREGIONS	"failed_regions"
88#define O2HB_DEBUG_REGION_NUMBER	"num"
89#define O2HB_DEBUG_REGION_ELAPSED_TIME	"elapsed_time_in_ms"
90#define O2HB_DEBUG_REGION_PINNED	"pinned"
91
92static struct dentry *o2hb_debug_dir;
93
94static LIST_HEAD(o2hb_all_regions);
95
96static struct o2hb_callback {
97	struct list_head list;
98} o2hb_callbacks[O2HB_NUM_CB];
99
100static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
101
102enum o2hb_heartbeat_modes {
103	O2HB_HEARTBEAT_LOCAL		= 0,
104	O2HB_HEARTBEAT_GLOBAL,
105	O2HB_HEARTBEAT_NUM_MODES,
106};
107
108static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
109	"local",	/* O2HB_HEARTBEAT_LOCAL */
110	"global",	/* O2HB_HEARTBEAT_GLOBAL */
111};
112
113unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
114static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
115
116/*
117 * o2hb_dependent_users tracks the number of registered callbacks that depend
118 * on heartbeat. o2net and o2dlm are two entities that register this callback.
119 * However only o2dlm depends on the heartbeat. It does not want the heartbeat
120 * to stop while a dlm domain is still active.
121 */
122static unsigned int o2hb_dependent_users;
123
124/*
125 * In global heartbeat mode, all regions are pinned if there are one or more
126 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
127 * regions are unpinned if the region count exceeds the cut off or the number
128 * of dependent users falls to zero.
129 */
130#define O2HB_PIN_CUT_OFF		3
131
132/*
133 * In local heartbeat mode, we assume the dlm domain name to be the same as
134 * region uuid. This is true for domains created for the file system but not
135 * necessarily true for userdlm domains. This is a known limitation.
136 *
137 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
138 * works for both file system and userdlm domains.
139 */
140static int o2hb_region_pin(const char *region_uuid);
141static void o2hb_region_unpin(const char *region_uuid);
142
143/* Only sets a new threshold if there are no active regions.
144 *
145 * No locking or otherwise interesting code is required for reading
146 * o2hb_dead_threshold as it can't change once regions are active and
147 * it's not interesting to anyone until then anyway. */
148static void o2hb_dead_threshold_set(unsigned int threshold)
149{
150	if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
151		spin_lock(&o2hb_live_lock);
152		if (list_empty(&o2hb_all_regions))
153			o2hb_dead_threshold = threshold;
154		spin_unlock(&o2hb_live_lock);
155	}
156}
157
158static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
159{
160	int ret = -1;
161
162	if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
163		spin_lock(&o2hb_live_lock);
164		if (list_empty(&o2hb_all_regions)) {
165			o2hb_heartbeat_mode = hb_mode;
166			ret = 0;
167		}
168		spin_unlock(&o2hb_live_lock);
169	}
170
171	return ret;
172}
173
174struct o2hb_node_event {
175	struct list_head        hn_item;
176	enum o2hb_callback_type hn_event_type;
177	struct o2nm_node        *hn_node;
178	int                     hn_node_num;
179};
180
181struct o2hb_disk_slot {
182	struct o2hb_disk_heartbeat_block *ds_raw_block;
183	u8			ds_node_num;
184	u64			ds_last_time;
185	u64			ds_last_generation;
186	u16			ds_equal_samples;
187	u16			ds_changed_samples;
188	struct list_head	ds_live_item;
189};
190
191/* each thread owns a region.. when we're asked to tear down the region
192 * we ask the thread to stop, who cleans up the region */
193struct o2hb_region {
194	struct config_item	hr_item;
195
196	struct list_head	hr_all_item;
197	unsigned		hr_unclean_stop:1,
198				hr_aborted_start:1,
199				hr_item_pinned:1,
200				hr_item_dropped:1,
201				hr_node_deleted:1;
202
203	/* protected by the hr_callback_sem */
204	struct task_struct 	*hr_task;
205
206	unsigned int		hr_blocks;
207	unsigned long long	hr_start_block;
208
209	unsigned int		hr_block_bits;
210	unsigned int		hr_block_bytes;
211
212	unsigned int		hr_slots_per_page;
213	unsigned int		hr_num_pages;
214
215	struct page             **hr_slot_data;
216	struct block_device	*hr_bdev;
217	struct o2hb_disk_slot	*hr_slots;
218
219	/* live node map of this region */
220	unsigned long		hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
221	unsigned int		hr_region_num;
222
223	struct dentry		*hr_debug_dir;
224	struct o2hb_debug_buf	*hr_db_livenodes;
225	struct o2hb_debug_buf	*hr_db_regnum;
226	struct o2hb_debug_buf	*hr_db_elapsed_time;
227	struct o2hb_debug_buf	*hr_db_pinned;
228
229	/* let the person setting up hb wait for it to return until it
230	 * has reached a 'steady' state.  This will be fixed when we have
231	 * a more complete api that doesn't lead to this sort of fragility. */
232	atomic_t		hr_steady_iterations;
233
234	/* terminate o2hb thread if it does not reach steady state
235	 * (hr_steady_iterations == 0) within hr_unsteady_iterations */
236	atomic_t		hr_unsteady_iterations;
237
238	unsigned int		hr_timeout_ms;
239
240	/* randomized as the region goes up and down so that a node
241	 * recognizes a node going up and down in one iteration */
242	u64			hr_generation;
243
244	struct delayed_work	hr_write_timeout_work;
245	unsigned long		hr_last_timeout_start;
246
247	/* negotiate timer, used to negotiate extending hb timeout. */
248	struct delayed_work	hr_nego_timeout_work;
249	unsigned long		hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
250
251	/* Used during o2hb_check_slot to hold a copy of the block
252	 * being checked because we temporarily have to zero out the
253	 * crc field. */
254	struct o2hb_disk_heartbeat_block *hr_tmp_block;
255
256	/* Message key for negotiate timeout message. */
257	unsigned int		hr_key;
258	struct list_head	hr_handler_list;
259
260	/* last hb status, 0 for success, other value for error. */
261	int			hr_last_hb_status;
262};
263
264struct o2hb_bio_wait_ctxt {
265	atomic_t          wc_num_reqs;
266	struct completion wc_io_complete;
267	int               wc_error;
268};
269
270#define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
271
272enum {
273	O2HB_NEGO_TIMEOUT_MSG = 1,
274	O2HB_NEGO_APPROVE_MSG = 2,
275};
276
277struct o2hb_nego_msg {
278	u8 node_num;
279};
280
281static void o2hb_write_timeout(struct work_struct *work)
282{
283	int failed, quorum;
284	struct o2hb_region *reg =
285		container_of(work, struct o2hb_region,
286			     hr_write_timeout_work.work);
287
288	mlog(ML_ERROR, "Heartbeat write timeout to device %pg after %u "
289	     "milliseconds\n", reg->hr_bdev,
290	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
291
292	if (o2hb_global_heartbeat_active()) {
293		spin_lock(&o2hb_live_lock);
294		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
295			set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
296		failed = bitmap_weight(o2hb_failed_region_bitmap,
297					O2NM_MAX_REGIONS);
298		quorum = bitmap_weight(o2hb_quorum_region_bitmap,
299					O2NM_MAX_REGIONS);
300		spin_unlock(&o2hb_live_lock);
301
302		mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
303		     quorum, failed);
304
305		/*
306		 * Fence if the number of failed regions >= half the number
307		 * of  quorum regions
308		 */
309		if ((failed << 1) < quorum)
310			return;
311	}
312
313	o2quo_disk_timeout();
314}
315
316static void o2hb_arm_timeout(struct o2hb_region *reg)
317{
318	/* Arm writeout only after thread reaches steady state */
319	if (atomic_read(&reg->hr_steady_iterations) != 0)
320		return;
321
322	mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
323	     O2HB_MAX_WRITE_TIMEOUT_MS);
324
325	if (o2hb_global_heartbeat_active()) {
326		spin_lock(&o2hb_live_lock);
327		clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
328		spin_unlock(&o2hb_live_lock);
329	}
330	cancel_delayed_work(&reg->hr_write_timeout_work);
331	schedule_delayed_work(&reg->hr_write_timeout_work,
332			      msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
333
334	cancel_delayed_work(&reg->hr_nego_timeout_work);
335	/* negotiate timeout must be less than write timeout. */
336	schedule_delayed_work(&reg->hr_nego_timeout_work,
337			      msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
338	bitmap_zero(reg->hr_nego_node_bitmap, O2NM_MAX_NODES);
339}
340
341static void o2hb_disarm_timeout(struct o2hb_region *reg)
342{
343	cancel_delayed_work_sync(&reg->hr_write_timeout_work);
344	cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
345}
346
347static int o2hb_send_nego_msg(int key, int type, u8 target)
348{
349	struct o2hb_nego_msg msg;
350	int status, ret;
351
352	msg.node_num = o2nm_this_node();
353again:
354	ret = o2net_send_message(type, key, &msg, sizeof(msg),
355			target, &status);
356
357	if (ret == -EAGAIN || ret == -ENOMEM) {
358		msleep(100);
359		goto again;
360	}
361
362	return ret;
363}
364
365static void o2hb_nego_timeout(struct work_struct *work)
366{
367	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
368	int master_node, i, ret;
369	struct o2hb_region *reg;
370
371	reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
372	/* don't negotiate timeout if last hb failed since it is very
373	 * possible io failed. Should let write timeout fence self.
374	 */
375	if (reg->hr_last_hb_status)
376		return;
377
378	o2hb_fill_node_map(live_node_bitmap, O2NM_MAX_NODES);
379	/* lowest node as master node to make negotiate decision. */
380	master_node = find_first_bit(live_node_bitmap, O2NM_MAX_NODES);
381
382	if (master_node == o2nm_this_node()) {
383		if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
384			printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%pg).\n",
385				o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
386				config_item_name(&reg->hr_item), reg->hr_bdev);
387			set_bit(master_node, reg->hr_nego_node_bitmap);
388		}
389		if (!bitmap_equal(reg->hr_nego_node_bitmap, live_node_bitmap,
390				  O2NM_MAX_NODES)) {
391			/* check negotiate bitmap every second to do timeout
392			 * approve decision.
393			 */
394			schedule_delayed_work(&reg->hr_nego_timeout_work,
395				msecs_to_jiffies(1000));
396
397			return;
398		}
399
400		printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%pg) is down.\n",
401			config_item_name(&reg->hr_item), reg->hr_bdev);
402		/* approve negotiate timeout request. */
403		o2hb_arm_timeout(reg);
404
405		i = -1;
406		while ((i = find_next_bit(live_node_bitmap,
407				O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
408			if (i == master_node)
409				continue;
410
411			mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
412			ret = o2hb_send_nego_msg(reg->hr_key,
413					O2HB_NEGO_APPROVE_MSG, i);
414			if (ret)
415				mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
416					i, ret);
417		}
418	} else {
419		/* negotiate timeout with master node. */
420		printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%pg), negotiate timeout with node %d.\n",
421			o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
422			reg->hr_bdev, master_node);
423		ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
424				master_node);
425		if (ret)
426			mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
427				master_node, ret);
428	}
429}
430
431static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
432				void **ret_data)
433{
434	struct o2hb_region *reg = data;
435	struct o2hb_nego_msg *nego_msg;
436
437	nego_msg = (struct o2hb_nego_msg *)msg->buf;
438	printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%pg).\n",
439		nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_bdev);
440	if (nego_msg->node_num < O2NM_MAX_NODES)
441		set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
442	else
443		mlog(ML_ERROR, "got nego timeout message from bad node.\n");
444
445	return 0;
446}
447
448static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
449				void **ret_data)
450{
451	struct o2hb_region *reg = data;
452
453	printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%pg).\n",
454		config_item_name(&reg->hr_item), reg->hr_bdev);
455	o2hb_arm_timeout(reg);
456	return 0;
457}
458
459static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
460{
461	atomic_set(&wc->wc_num_reqs, 1);
462	init_completion(&wc->wc_io_complete);
463	wc->wc_error = 0;
464}
465
466/* Used in error paths too */
467static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
468				     unsigned int num)
469{
470	/* sadly atomic_sub_and_test() isn't available on all platforms.  The
471	 * good news is that the fast path only completes one at a time */
472	while(num--) {
473		if (atomic_dec_and_test(&wc->wc_num_reqs)) {
474			BUG_ON(num > 0);
475			complete(&wc->wc_io_complete);
476		}
477	}
478}
479
480static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc)
481{
482	o2hb_bio_wait_dec(wc, 1);
483	wait_for_completion(&wc->wc_io_complete);
484}
485
486static void o2hb_bio_end_io(struct bio *bio)
487{
488	struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
489
490	if (bio->bi_status) {
491		mlog(ML_ERROR, "IO Error %d\n", bio->bi_status);
492		wc->wc_error = blk_status_to_errno(bio->bi_status);
493	}
494
495	o2hb_bio_wait_dec(wc, 1);
496	bio_put(bio);
497}
498
499/* Setup a Bio to cover I/O against num_slots slots starting at
500 * start_slot. */
501static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
502				      struct o2hb_bio_wait_ctxt *wc,
503				      unsigned int *current_slot,
504				      unsigned int max_slots, blk_opf_t opf)
505{
506	int len, current_page;
507	unsigned int vec_len, vec_start;
508	unsigned int bits = reg->hr_block_bits;
509	unsigned int spp = reg->hr_slots_per_page;
510	unsigned int cs = *current_slot;
511	struct bio *bio;
512	struct page *page;
513
514	/* Testing has shown this allocation to take long enough under
515	 * GFP_KERNEL that the local node can get fenced. It would be
516	 * nicest if we could pre-allocate these bios and avoid this
517	 * all together. */
518	bio = bio_alloc(reg->hr_bdev, 16, opf, GFP_ATOMIC);
519	if (!bio) {
520		mlog(ML_ERROR, "Could not alloc slots BIO!\n");
521		bio = ERR_PTR(-ENOMEM);
522		goto bail;
523	}
524
525	/* Must put everything in 512 byte sectors for the bio... */
526	bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
527	bio->bi_private = wc;
528	bio->bi_end_io = o2hb_bio_end_io;
529
530	vec_start = (cs << bits) % PAGE_SIZE;
531	while(cs < max_slots) {
532		current_page = cs / spp;
533		page = reg->hr_slot_data[current_page];
534
535		vec_len = min(PAGE_SIZE - vec_start,
536			      (max_slots-cs) * (PAGE_SIZE/spp) );
537
538		mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
539		     current_page, vec_len, vec_start);
540
541		len = bio_add_page(bio, page, vec_len, vec_start);
542		if (len != vec_len) break;
543
544		cs += vec_len / (PAGE_SIZE/spp);
545		vec_start = 0;
546	}
547
548bail:
549	*current_slot = cs;
550	return bio;
551}
552
553static int o2hb_read_slots(struct o2hb_region *reg,
554			   unsigned int begin_slot,
555			   unsigned int max_slots)
556{
557	unsigned int current_slot = begin_slot;
558	int status;
559	struct o2hb_bio_wait_ctxt wc;
560	struct bio *bio;
561
562	o2hb_bio_wait_init(&wc);
563
564	while(current_slot < max_slots) {
565		bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots,
566					 REQ_OP_READ);
567		if (IS_ERR(bio)) {
568			status = PTR_ERR(bio);
569			mlog_errno(status);
570			goto bail_and_wait;
571		}
572
573		atomic_inc(&wc.wc_num_reqs);
574		submit_bio(bio);
575	}
576
577	status = 0;
578
579bail_and_wait:
580	o2hb_wait_on_io(&wc);
581	if (wc.wc_error && !status)
582		status = wc.wc_error;
583
584	return status;
585}
586
587static int o2hb_issue_node_write(struct o2hb_region *reg,
588				 struct o2hb_bio_wait_ctxt *write_wc)
589{
590	int status;
591	unsigned int slot;
592	struct bio *bio;
593
594	o2hb_bio_wait_init(write_wc);
595
596	slot = o2nm_this_node();
597
598	bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1,
599				 REQ_OP_WRITE | REQ_SYNC);
600	if (IS_ERR(bio)) {
601		status = PTR_ERR(bio);
602		mlog_errno(status);
603		goto bail;
604	}
605
606	atomic_inc(&write_wc->wc_num_reqs);
607	submit_bio(bio);
608
609	status = 0;
610bail:
611	return status;
612}
613
614static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
615				     struct o2hb_disk_heartbeat_block *hb_block)
616{
617	__le32 old_cksum;
618	u32 ret;
619
620	/* We want to compute the block crc with a 0 value in the
621	 * hb_cksum field. Save it off here and replace after the
622	 * crc. */
623	old_cksum = hb_block->hb_cksum;
624	hb_block->hb_cksum = 0;
625
626	ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
627
628	hb_block->hb_cksum = old_cksum;
629
630	return ret;
631}
632
633static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
634{
635	mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
636	     "cksum = 0x%x, generation 0x%llx\n",
637	     (long long)le64_to_cpu(hb_block->hb_seq),
638	     hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
639	     (long long)le64_to_cpu(hb_block->hb_generation));
640}
641
642static int o2hb_verify_crc(struct o2hb_region *reg,
643			   struct o2hb_disk_heartbeat_block *hb_block)
644{
645	u32 read, computed;
646
647	read = le32_to_cpu(hb_block->hb_cksum);
648	computed = o2hb_compute_block_crc_le(reg, hb_block);
649
650	return read == computed;
651}
652
653/*
654 * Compare the slot data with what we wrote in the last iteration.
655 * If the match fails, print an appropriate error message. This is to
656 * detect errors like... another node hearting on the same slot,
657 * flaky device that is losing writes, etc.
658 * Returns 1 if check succeeds, 0 otherwise.
659 */
660static int o2hb_check_own_slot(struct o2hb_region *reg)
661{
662	struct o2hb_disk_slot *slot;
663	struct o2hb_disk_heartbeat_block *hb_block;
664	char *errstr;
665
666	slot = &reg->hr_slots[o2nm_this_node()];
667	/* Don't check on our 1st timestamp */
668	if (!slot->ds_last_time)
669		return 0;
670
671	hb_block = slot->ds_raw_block;
672	if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
673	    le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
674	    hb_block->hb_node == slot->ds_node_num)
675		return 1;
676
677#define ERRSTR1		"Another node is heartbeating on device"
678#define ERRSTR2		"Heartbeat generation mismatch on device"
679#define ERRSTR3		"Heartbeat sequence mismatch on device"
680
681	if (hb_block->hb_node != slot->ds_node_num)
682		errstr = ERRSTR1;
683	else if (le64_to_cpu(hb_block->hb_generation) !=
684		 slot->ds_last_generation)
685		errstr = ERRSTR2;
686	else
687		errstr = ERRSTR3;
688
689	mlog(ML_ERROR, "%s (%pg): expected(%u:0x%llx, 0x%llx), "
690	     "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_bdev,
691	     slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
692	     (unsigned long long)slot->ds_last_time, hb_block->hb_node,
693	     (unsigned long long)le64_to_cpu(hb_block->hb_generation),
694	     (unsigned long long)le64_to_cpu(hb_block->hb_seq));
695
696	return 0;
697}
698
699static inline void o2hb_prepare_block(struct o2hb_region *reg,
700				      u64 generation)
701{
702	int node_num;
703	u64 cputime;
704	struct o2hb_disk_slot *slot;
705	struct o2hb_disk_heartbeat_block *hb_block;
706
707	node_num = o2nm_this_node();
708	slot = &reg->hr_slots[node_num];
709
710	hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
711	memset(hb_block, 0, reg->hr_block_bytes);
712	/* TODO: time stuff */
713	cputime = ktime_get_real_seconds();
714	if (!cputime)
715		cputime = 1;
716
717	hb_block->hb_seq = cpu_to_le64(cputime);
718	hb_block->hb_node = node_num;
719	hb_block->hb_generation = cpu_to_le64(generation);
720	hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
721
722	/* This step must always happen last! */
723	hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
724								   hb_block));
725
726	mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
727	     (long long)generation,
728	     le32_to_cpu(hb_block->hb_cksum));
729}
730
731static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
732				struct o2nm_node *node,
733				int idx)
734{
735	struct o2hb_callback_func *f;
736
737	list_for_each_entry(f, &hbcall->list, hc_item) {
738		mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
739		(f->hc_func)(node, idx, f->hc_data);
740	}
741}
742
743/* Will run the list in order until we process the passed event */
744static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
745{
746	struct o2hb_callback *hbcall;
747	struct o2hb_node_event *event;
748
749	/* Holding callback sem assures we don't alter the callback
750	 * lists when doing this, and serializes ourselves with other
751	 * processes wanting callbacks. */
752	down_write(&o2hb_callback_sem);
753
754	spin_lock(&o2hb_live_lock);
755	while (!list_empty(&o2hb_node_events)
756	       && !list_empty(&queued_event->hn_item)) {
757		event = list_entry(o2hb_node_events.next,
758				   struct o2hb_node_event,
759				   hn_item);
760		list_del_init(&event->hn_item);
761		spin_unlock(&o2hb_live_lock);
762
763		mlog(ML_HEARTBEAT, "Node %s event for %d\n",
764		     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
765		     event->hn_node_num);
766
767		hbcall = hbcall_from_type(event->hn_event_type);
768
769		/* We should *never* have gotten on to the list with a
770		 * bad type... This isn't something that we should try
771		 * to recover from. */
772		BUG_ON(IS_ERR(hbcall));
773
774		o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
775
776		spin_lock(&o2hb_live_lock);
777	}
778	spin_unlock(&o2hb_live_lock);
779
780	up_write(&o2hb_callback_sem);
781}
782
783static void o2hb_queue_node_event(struct o2hb_node_event *event,
784				  enum o2hb_callback_type type,
785				  struct o2nm_node *node,
786				  int node_num)
787{
788	assert_spin_locked(&o2hb_live_lock);
789
790	BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
791
792	event->hn_event_type = type;
793	event->hn_node = node;
794	event->hn_node_num = node_num;
795
796	mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
797	     type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
798
799	list_add_tail(&event->hn_item, &o2hb_node_events);
800}
801
802static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
803{
804	struct o2hb_node_event event =
805		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
806	struct o2nm_node *node;
807	int queued = 0;
808
809	node = o2nm_get_node_by_num(slot->ds_node_num);
810	if (!node)
811		return;
812
813	spin_lock(&o2hb_live_lock);
814	if (!list_empty(&slot->ds_live_item)) {
815		mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
816		     slot->ds_node_num);
817
818		list_del_init(&slot->ds_live_item);
819
820		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
821			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
822
823			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
824					      slot->ds_node_num);
825			queued = 1;
826		}
827	}
828	spin_unlock(&o2hb_live_lock);
829
830	if (queued)
831		o2hb_run_event_list(&event);
832
833	o2nm_node_put(node);
834}
835
836static void o2hb_set_quorum_device(struct o2hb_region *reg)
837{
838	if (!o2hb_global_heartbeat_active())
839		return;
840
841	/* Prevent race with o2hb_heartbeat_group_drop_item() */
842	if (kthread_should_stop())
843		return;
844
845	/* Tag region as quorum only after thread reaches steady state */
846	if (atomic_read(&reg->hr_steady_iterations) != 0)
847		return;
848
849	spin_lock(&o2hb_live_lock);
850
851	if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
852		goto unlock;
853
854	/*
855	 * A region can be added to the quorum only when it sees all
856	 * live nodes heartbeat on it. In other words, the region has been
857	 * added to all nodes.
858	 */
859	if (!bitmap_equal(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
860			  O2NM_MAX_NODES))
861		goto unlock;
862
863	printk(KERN_NOTICE "o2hb: Region %s (%pg) is now a quorum device\n",
864	       config_item_name(&reg->hr_item), reg->hr_bdev);
865
866	set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
867
868	/*
869	 * If global heartbeat active, unpin all regions if the
870	 * region count > CUT_OFF
871	 */
872	if (bitmap_weight(o2hb_quorum_region_bitmap,
873			   O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
874		o2hb_region_unpin(NULL);
875unlock:
876	spin_unlock(&o2hb_live_lock);
877}
878
879static int o2hb_check_slot(struct o2hb_region *reg,
880			   struct o2hb_disk_slot *slot)
881{
882	int changed = 0, gen_changed = 0;
883	struct o2hb_node_event event =
884		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
885	struct o2nm_node *node;
886	struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
887	u64 cputime;
888	unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
889	unsigned int slot_dead_ms;
890	int tmp;
891	int queued = 0;
892
893	memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
894
895	/*
896	 * If a node is no longer configured but is still in the livemap, we
897	 * may need to clear that bit from the livemap.
898	 */
899	node = o2nm_get_node_by_num(slot->ds_node_num);
900	if (!node) {
901		spin_lock(&o2hb_live_lock);
902		tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
903		spin_unlock(&o2hb_live_lock);
904		if (!tmp)
905			return 0;
906	}
907
908	if (!o2hb_verify_crc(reg, hb_block)) {
909		/* all paths from here will drop o2hb_live_lock for
910		 * us. */
911		spin_lock(&o2hb_live_lock);
912
913		/* Don't print an error on the console in this case -
914		 * a freshly formatted heartbeat area will not have a
915		 * crc set on it. */
916		if (list_empty(&slot->ds_live_item))
917			goto out;
918
919		/* The node is live but pushed out a bad crc. We
920		 * consider it a transient miss but don't populate any
921		 * other values as they may be junk. */
922		mlog(ML_ERROR, "Node %d has written a bad crc to %pg\n",
923		     slot->ds_node_num, reg->hr_bdev);
924		o2hb_dump_slot(hb_block);
925
926		slot->ds_equal_samples++;
927		goto fire_callbacks;
928	}
929
930	/* we don't care if these wrap.. the state transitions below
931	 * clear at the right places */
932	cputime = le64_to_cpu(hb_block->hb_seq);
933	if (slot->ds_last_time != cputime)
934		slot->ds_changed_samples++;
935	else
936		slot->ds_equal_samples++;
937	slot->ds_last_time = cputime;
938
939	/* The node changed heartbeat generations. We assume this to
940	 * mean it dropped off but came back before we timed out. We
941	 * want to consider it down for the time being but don't want
942	 * to lose any changed_samples state we might build up to
943	 * considering it live again. */
944	if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
945		gen_changed = 1;
946		slot->ds_equal_samples = 0;
947		mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
948		     "to 0x%llx)\n", slot->ds_node_num,
949		     (long long)slot->ds_last_generation,
950		     (long long)le64_to_cpu(hb_block->hb_generation));
951	}
952
953	slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
954
955	mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
956	     "seq %llu last %llu changed %u equal %u\n",
957	     slot->ds_node_num, (long long)slot->ds_last_generation,
958	     le32_to_cpu(hb_block->hb_cksum),
959	     (unsigned long long)le64_to_cpu(hb_block->hb_seq),
960	     (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
961	     slot->ds_equal_samples);
962
963	spin_lock(&o2hb_live_lock);
964
965fire_callbacks:
966	/* dead nodes only come to life after some number of
967	 * changes at any time during their dead time */
968	if (list_empty(&slot->ds_live_item) &&
969	    slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
970		mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
971		     slot->ds_node_num, (long long)slot->ds_last_generation);
972
973		set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
974
975		/* first on the list generates a callback */
976		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
977			mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
978			     "bitmap\n", slot->ds_node_num);
979			set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
980
981			o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
982					      slot->ds_node_num);
983
984			changed = 1;
985			queued = 1;
986		}
987
988		list_add_tail(&slot->ds_live_item,
989			      &o2hb_live_slots[slot->ds_node_num]);
990
991		slot->ds_equal_samples = 0;
992
993		/* We want to be sure that all nodes agree on the
994		 * number of milliseconds before a node will be
995		 * considered dead. The self-fencing timeout is
996		 * computed from this value, and a discrepancy might
997		 * result in heartbeat calling a node dead when it
998		 * hasn't self-fenced yet. */
999		slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1000		if (slot_dead_ms && slot_dead_ms != dead_ms) {
1001			/* TODO: Perhaps we can fail the region here. */
1002			mlog(ML_ERROR, "Node %d on device %pg has a dead count "
1003			     "of %u ms, but our count is %u ms.\n"
1004			     "Please double check your configuration values "
1005			     "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1006			     slot->ds_node_num, reg->hr_bdev, slot_dead_ms,
1007			     dead_ms);
1008		}
1009		goto out;
1010	}
1011
1012	/* if the list is dead, we're done.. */
1013	if (list_empty(&slot->ds_live_item))
1014		goto out;
1015
1016	/* live nodes only go dead after enough consequtive missed
1017	 * samples..  reset the missed counter whenever we see
1018	 * activity */
1019	if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1020		mlog(ML_HEARTBEAT, "Node %d left my region\n",
1021		     slot->ds_node_num);
1022
1023		clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1024
1025		/* last off the live_slot generates a callback */
1026		list_del_init(&slot->ds_live_item);
1027		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1028			mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1029			     "nodes bitmap\n", slot->ds_node_num);
1030			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1031
1032			/* node can be null */
1033			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1034					      node, slot->ds_node_num);
1035
1036			changed = 1;
1037			queued = 1;
1038		}
1039
1040		/* We don't clear this because the node is still
1041		 * actually writing new blocks. */
1042		if (!gen_changed)
1043			slot->ds_changed_samples = 0;
1044		goto out;
1045	}
1046	if (slot->ds_changed_samples) {
1047		slot->ds_changed_samples = 0;
1048		slot->ds_equal_samples = 0;
1049	}
1050out:
1051	spin_unlock(&o2hb_live_lock);
1052
1053	if (queued)
1054		o2hb_run_event_list(&event);
1055
1056	if (node)
1057		o2nm_node_put(node);
1058	return changed;
1059}
1060
1061static int o2hb_highest_node(unsigned long *nodes, int numbits)
1062{
1063	return find_last_bit(nodes, numbits);
1064}
1065
1066static int o2hb_lowest_node(unsigned long *nodes, int numbits)
1067{
1068	return find_first_bit(nodes, numbits);
1069}
1070
1071static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1072{
1073	int i, ret, highest_node, lowest_node;
1074	int membership_change = 0, own_slot_ok = 0;
1075	unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1076	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1077	struct o2hb_bio_wait_ctxt write_wc;
1078
1079	ret = o2nm_configured_node_map(configured_nodes,
1080				       sizeof(configured_nodes));
1081	if (ret) {
1082		mlog_errno(ret);
1083		goto bail;
1084	}
1085
1086	/*
1087	 * If a node is not configured but is in the livemap, we still need
1088	 * to read the slot so as to be able to remove it from the livemap.
1089	 */
1090	o2hb_fill_node_map(live_node_bitmap, O2NM_MAX_NODES);
1091	i = -1;
1092	while ((i = find_next_bit(live_node_bitmap,
1093				  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1094		set_bit(i, configured_nodes);
1095	}
1096
1097	highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1098	lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES);
1099	if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) {
1100		mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1101		ret = -EINVAL;
1102		goto bail;
1103	}
1104
1105	/* No sense in reading the slots of nodes that don't exist
1106	 * yet. Of course, if the node definitions have holes in them
1107	 * then we're reading an empty slot anyway... Consider this
1108	 * best-effort. */
1109	ret = o2hb_read_slots(reg, lowest_node, highest_node + 1);
1110	if (ret < 0) {
1111		mlog_errno(ret);
1112		goto bail;
1113	}
1114
1115	/* With an up to date view of the slots, we can check that no
1116	 * other node has been improperly configured to heartbeat in
1117	 * our slot. */
1118	own_slot_ok = o2hb_check_own_slot(reg);
1119
1120	/* fill in the proper info for our next heartbeat */
1121	o2hb_prepare_block(reg, reg->hr_generation);
1122
1123	ret = o2hb_issue_node_write(reg, &write_wc);
1124	if (ret < 0) {
1125		mlog_errno(ret);
1126		goto bail;
1127	}
1128
1129	i = -1;
1130	while((i = find_next_bit(configured_nodes,
1131				 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1132		membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1133	}
1134
1135	/*
1136	 * We have to be sure we've advertised ourselves on disk
1137	 * before we can go to steady state.  This ensures that
1138	 * people we find in our steady state have seen us.
1139	 */
1140	o2hb_wait_on_io(&write_wc);
1141	if (write_wc.wc_error) {
1142		/* Do not re-arm the write timeout on I/O error - we
1143		 * can't be sure that the new block ever made it to
1144		 * disk */
1145		mlog(ML_ERROR, "Write error %d on device \"%pg\"\n",
1146		     write_wc.wc_error, reg->hr_bdev);
1147		ret = write_wc.wc_error;
1148		goto bail;
1149	}
1150
1151	/* Skip disarming the timeout if own slot has stale/bad data */
1152	if (own_slot_ok) {
1153		o2hb_set_quorum_device(reg);
1154		o2hb_arm_timeout(reg);
1155		reg->hr_last_timeout_start = jiffies;
1156	}
1157
1158bail:
1159	/* let the person who launched us know when things are steady */
1160	if (atomic_read(&reg->hr_steady_iterations) != 0) {
1161		if (!ret && own_slot_ok && !membership_change) {
1162			if (atomic_dec_and_test(&reg->hr_steady_iterations))
1163				wake_up(&o2hb_steady_queue);
1164		}
1165	}
1166
1167	if (atomic_read(&reg->hr_steady_iterations) != 0) {
1168		if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1169			printk(KERN_NOTICE "o2hb: Unable to stabilize "
1170			       "heartbeat on region %s (%pg)\n",
1171			       config_item_name(&reg->hr_item),
1172			       reg->hr_bdev);
1173			atomic_set(&reg->hr_steady_iterations, 0);
1174			reg->hr_aborted_start = 1;
1175			wake_up(&o2hb_steady_queue);
1176			ret = -EIO;
1177		}
1178	}
1179
1180	return ret;
1181}
1182
1183/*
1184 * we ride the region ref that the region dir holds.  before the region
1185 * dir is removed and drops it ref it will wait to tear down this
1186 * thread.
1187 */
1188static int o2hb_thread(void *data)
1189{
1190	int i, ret;
1191	struct o2hb_region *reg = data;
1192	struct o2hb_bio_wait_ctxt write_wc;
1193	ktime_t before_hb, after_hb;
1194	unsigned int elapsed_msec;
1195
1196	mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1197
1198	set_user_nice(current, MIN_NICE);
1199
1200	/* Pin node */
1201	ret = o2nm_depend_this_node();
1202	if (ret) {
1203		mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1204		reg->hr_node_deleted = 1;
1205		wake_up(&o2hb_steady_queue);
1206		return 0;
1207	}
1208
1209	while (!kthread_should_stop() &&
1210	       !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1211		/* We track the time spent inside
1212		 * o2hb_do_disk_heartbeat so that we avoid more than
1213		 * hr_timeout_ms between disk writes. On busy systems
1214		 * this should result in a heartbeat which is less
1215		 * likely to time itself out. */
1216		before_hb = ktime_get_real();
1217
1218		ret = o2hb_do_disk_heartbeat(reg);
1219		reg->hr_last_hb_status = ret;
1220
1221		after_hb = ktime_get_real();
1222
1223		elapsed_msec = (unsigned int)
1224				ktime_ms_delta(after_hb, before_hb);
1225
1226		mlog(ML_HEARTBEAT,
1227		     "start = %lld, end = %lld, msec = %u, ret = %d\n",
1228		     before_hb, after_hb, elapsed_msec, ret);
1229
1230		if (!kthread_should_stop() &&
1231		    elapsed_msec < reg->hr_timeout_ms) {
1232			/* the kthread api has blocked signals for us so no
1233			 * need to record the return value. */
1234			msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1235		}
1236	}
1237
1238	o2hb_disarm_timeout(reg);
1239
1240	/* unclean stop is only used in very bad situation */
1241	for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1242		o2hb_shutdown_slot(&reg->hr_slots[i]);
1243
1244	/* Explicit down notification - avoid forcing the other nodes
1245	 * to timeout on this region when we could just as easily
1246	 * write a clear generation - thus indicating to them that
1247	 * this node has left this region.
1248	 */
1249	if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1250		o2hb_prepare_block(reg, 0);
1251		ret = o2hb_issue_node_write(reg, &write_wc);
1252		if (ret == 0)
1253			o2hb_wait_on_io(&write_wc);
1254		else
1255			mlog_errno(ret);
1256	}
1257
1258	/* Unpin node */
1259	o2nm_undepend_this_node();
1260
1261	mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1262
1263	return 0;
1264}
1265
1266#ifdef CONFIG_DEBUG_FS
1267static int o2hb_debug_open(struct inode *inode, struct file *file)
1268{
1269	struct o2hb_debug_buf *db = inode->i_private;
1270	struct o2hb_region *reg;
1271	unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1272	unsigned long lts;
1273	char *buf = NULL;
1274	int i = -1;
1275	int out = 0;
1276
1277	/* max_nodes should be the largest bitmap we pass here */
1278	BUG_ON(sizeof(map) < db->db_size);
1279
1280	buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1281	if (!buf)
1282		goto bail;
1283
1284	switch (db->db_type) {
1285	case O2HB_DB_TYPE_LIVENODES:
1286	case O2HB_DB_TYPE_LIVEREGIONS:
1287	case O2HB_DB_TYPE_QUORUMREGIONS:
1288	case O2HB_DB_TYPE_FAILEDREGIONS:
1289		spin_lock(&o2hb_live_lock);
1290		memcpy(map, db->db_data, db->db_size);
1291		spin_unlock(&o2hb_live_lock);
1292		break;
1293
1294	case O2HB_DB_TYPE_REGION_LIVENODES:
1295		spin_lock(&o2hb_live_lock);
1296		reg = (struct o2hb_region *)db->db_data;
1297		memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1298		spin_unlock(&o2hb_live_lock);
1299		break;
1300
1301	case O2HB_DB_TYPE_REGION_NUMBER:
1302		reg = (struct o2hb_region *)db->db_data;
1303		out += scnprintf(buf + out, PAGE_SIZE - out, "%d\n",
1304				reg->hr_region_num);
1305		goto done;
1306
1307	case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1308		reg = (struct o2hb_region *)db->db_data;
1309		lts = reg->hr_last_timeout_start;
1310		/* If 0, it has never been set before */
1311		if (lts)
1312			lts = jiffies_to_msecs(jiffies - lts);
1313		out += scnprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1314		goto done;
1315
1316	case O2HB_DB_TYPE_REGION_PINNED:
1317		reg = (struct o2hb_region *)db->db_data;
1318		out += scnprintf(buf + out, PAGE_SIZE - out, "%u\n",
1319				!!reg->hr_item_pinned);
1320		goto done;
1321
1322	default:
1323		goto done;
1324	}
1325
1326	while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1327		out += scnprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1328	out += scnprintf(buf + out, PAGE_SIZE - out, "\n");
1329
1330done:
1331	i_size_write(inode, out);
1332
1333	file->private_data = buf;
1334
1335	return 0;
1336bail:
1337	return -ENOMEM;
1338}
1339
1340static int o2hb_debug_release(struct inode *inode, struct file *file)
1341{
1342	kfree(file->private_data);
1343	return 0;
1344}
1345
1346static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1347				 size_t nbytes, loff_t *ppos)
1348{
1349	return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1350				       i_size_read(file->f_mapping->host));
1351}
1352#else
1353static int o2hb_debug_open(struct inode *inode, struct file *file)
1354{
1355	return 0;
1356}
1357static int o2hb_debug_release(struct inode *inode, struct file *file)
1358{
1359	return 0;
1360}
1361static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1362			       size_t nbytes, loff_t *ppos)
1363{
1364	return 0;
1365}
1366#endif  /* CONFIG_DEBUG_FS */
1367
1368static const struct file_operations o2hb_debug_fops = {
1369	.open =		o2hb_debug_open,
1370	.release =	o2hb_debug_release,
1371	.read =		o2hb_debug_read,
1372	.llseek =	generic_file_llseek,
1373};
1374
1375void o2hb_exit(void)
1376{
1377	debugfs_remove_recursive(o2hb_debug_dir);
1378	kfree(o2hb_db_livenodes);
1379	kfree(o2hb_db_liveregions);
1380	kfree(o2hb_db_quorumregions);
1381	kfree(o2hb_db_failedregions);
1382}
1383
1384static void o2hb_debug_create(const char *name, struct dentry *dir,
1385			      struct o2hb_debug_buf **db, int db_len, int type,
1386			      int size, int len, void *data)
1387{
1388	*db = kmalloc(db_len, GFP_KERNEL);
1389	if (!*db)
1390		return;
1391
1392	(*db)->db_type = type;
1393	(*db)->db_size = size;
1394	(*db)->db_len = len;
1395	(*db)->db_data = data;
1396
1397	debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, &o2hb_debug_fops);
1398}
1399
1400static void o2hb_debug_init(void)
1401{
1402	o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1403
1404	o2hb_debug_create(O2HB_DEBUG_LIVENODES, o2hb_debug_dir,
1405			  &o2hb_db_livenodes, sizeof(*o2hb_db_livenodes),
1406			  O2HB_DB_TYPE_LIVENODES, sizeof(o2hb_live_node_bitmap),
1407			  O2NM_MAX_NODES, o2hb_live_node_bitmap);
1408
1409	o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, o2hb_debug_dir,
1410			  &o2hb_db_liveregions, sizeof(*o2hb_db_liveregions),
1411			  O2HB_DB_TYPE_LIVEREGIONS,
1412			  sizeof(o2hb_live_region_bitmap), O2NM_MAX_REGIONS,
1413			  o2hb_live_region_bitmap);
1414
1415	o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, o2hb_debug_dir,
1416			  &o2hb_db_quorumregions,
1417			  sizeof(*o2hb_db_quorumregions),
1418			  O2HB_DB_TYPE_QUORUMREGIONS,
1419			  sizeof(o2hb_quorum_region_bitmap), O2NM_MAX_REGIONS,
1420			  o2hb_quorum_region_bitmap);
1421
1422	o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, o2hb_debug_dir,
1423			  &o2hb_db_failedregions,
1424			  sizeof(*o2hb_db_failedregions),
1425			  O2HB_DB_TYPE_FAILEDREGIONS,
1426			  sizeof(o2hb_failed_region_bitmap), O2NM_MAX_REGIONS,
1427			  o2hb_failed_region_bitmap);
1428}
1429
1430void o2hb_init(void)
1431{
1432	int i;
1433
1434	for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1435		INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1436
1437	for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1438		INIT_LIST_HEAD(&o2hb_live_slots[i]);
1439
1440	bitmap_zero(o2hb_live_node_bitmap, O2NM_MAX_NODES);
1441	bitmap_zero(o2hb_region_bitmap, O2NM_MAX_REGIONS);
1442	bitmap_zero(o2hb_live_region_bitmap, O2NM_MAX_REGIONS);
1443	bitmap_zero(o2hb_quorum_region_bitmap, O2NM_MAX_REGIONS);
1444	bitmap_zero(o2hb_failed_region_bitmap, O2NM_MAX_REGIONS);
1445
1446	o2hb_dependent_users = 0;
1447
1448	o2hb_debug_init();
1449}
1450
1451/* if we're already in a callback then we're already serialized by the sem */
1452static void o2hb_fill_node_map_from_callback(unsigned long *map,
1453					     unsigned int bits)
1454{
1455	bitmap_copy(map, o2hb_live_node_bitmap, bits);
1456}
1457
1458/*
1459 * get a map of all nodes that are heartbeating in any regions
1460 */
1461void o2hb_fill_node_map(unsigned long *map, unsigned int bits)
1462{
1463	/* callers want to serialize this map and callbacks so that they
1464	 * can trust that they don't miss nodes coming to the party */
1465	down_read(&o2hb_callback_sem);
1466	spin_lock(&o2hb_live_lock);
1467	o2hb_fill_node_map_from_callback(map, bits);
1468	spin_unlock(&o2hb_live_lock);
1469	up_read(&o2hb_callback_sem);
1470}
1471EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1472
1473/*
1474 * heartbeat configfs bits.  The heartbeat set is a default set under
1475 * the cluster set in nodemanager.c.
1476 */
1477
1478static struct o2hb_region *to_o2hb_region(struct config_item *item)
1479{
1480	return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1481}
1482
1483/* drop_item only drops its ref after killing the thread, nothing should
1484 * be using the region anymore.  this has to clean up any state that
1485 * attributes might have built up. */
1486static void o2hb_region_release(struct config_item *item)
1487{
1488	int i;
1489	struct page *page;
1490	struct o2hb_region *reg = to_o2hb_region(item);
1491
1492	mlog(ML_HEARTBEAT, "hb region release (%pg)\n", reg->hr_bdev);
1493
1494	kfree(reg->hr_tmp_block);
1495
1496	if (reg->hr_slot_data) {
1497		for (i = 0; i < reg->hr_num_pages; i++) {
1498			page = reg->hr_slot_data[i];
1499			if (page)
1500				__free_page(page);
1501		}
1502		kfree(reg->hr_slot_data);
1503	}
1504
1505	if (reg->hr_bdev)
1506		blkdev_put(reg->hr_bdev, NULL);
1507
1508	kfree(reg->hr_slots);
1509
1510	debugfs_remove_recursive(reg->hr_debug_dir);
1511	kfree(reg->hr_db_livenodes);
1512	kfree(reg->hr_db_regnum);
1513	kfree(reg->hr_db_elapsed_time);
1514	kfree(reg->hr_db_pinned);
1515
1516	spin_lock(&o2hb_live_lock);
1517	list_del(&reg->hr_all_item);
1518	spin_unlock(&o2hb_live_lock);
1519
1520	o2net_unregister_handler_list(&reg->hr_handler_list);
1521	kfree(reg);
1522}
1523
1524static int o2hb_read_block_input(struct o2hb_region *reg,
1525				 const char *page,
1526				 unsigned long *ret_bytes,
1527				 unsigned int *ret_bits)
1528{
1529	unsigned long bytes;
1530	char *p = (char *)page;
1531
1532	bytes = simple_strtoul(p, &p, 0);
1533	if (!p || (*p && (*p != '\n')))
1534		return -EINVAL;
1535
1536	/* Heartbeat and fs min / max block sizes are the same. */
1537	if (bytes > 4096 || bytes < 512)
1538		return -ERANGE;
1539	if (hweight16(bytes) != 1)
1540		return -EINVAL;
1541
1542	if (ret_bytes)
1543		*ret_bytes = bytes;
1544	if (ret_bits)
1545		*ret_bits = ffs(bytes) - 1;
1546
1547	return 0;
1548}
1549
1550static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1551					    char *page)
1552{
1553	return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1554}
1555
1556static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1557					     const char *page,
1558					     size_t count)
1559{
1560	struct o2hb_region *reg = to_o2hb_region(item);
1561	int status;
1562	unsigned long block_bytes;
1563	unsigned int block_bits;
1564
1565	if (reg->hr_bdev)
1566		return -EINVAL;
1567
1568	status = o2hb_read_block_input(reg, page, &block_bytes,
1569				       &block_bits);
1570	if (status)
1571		return status;
1572
1573	reg->hr_block_bytes = (unsigned int)block_bytes;
1574	reg->hr_block_bits = block_bits;
1575
1576	return count;
1577}
1578
1579static ssize_t o2hb_region_start_block_show(struct config_item *item,
1580					    char *page)
1581{
1582	return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1583}
1584
1585static ssize_t o2hb_region_start_block_store(struct config_item *item,
1586					     const char *page,
1587					     size_t count)
1588{
1589	struct o2hb_region *reg = to_o2hb_region(item);
1590	unsigned long long tmp;
1591	char *p = (char *)page;
1592	ssize_t ret;
1593
1594	if (reg->hr_bdev)
1595		return -EINVAL;
1596
1597	ret = kstrtoull(p, 0, &tmp);
1598	if (ret)
1599		return -EINVAL;
1600
1601	reg->hr_start_block = tmp;
1602
1603	return count;
1604}
1605
1606static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1607{
1608	return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1609}
1610
1611static ssize_t o2hb_region_blocks_store(struct config_item *item,
1612					const char *page,
1613					size_t count)
1614{
1615	struct o2hb_region *reg = to_o2hb_region(item);
1616	unsigned long tmp;
1617	char *p = (char *)page;
1618
1619	if (reg->hr_bdev)
1620		return -EINVAL;
1621
1622	tmp = simple_strtoul(p, &p, 0);
1623	if (!p || (*p && (*p != '\n')))
1624		return -EINVAL;
1625
1626	if (tmp > O2NM_MAX_NODES || tmp == 0)
1627		return -ERANGE;
1628
1629	reg->hr_blocks = (unsigned int)tmp;
1630
1631	return count;
1632}
1633
1634static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1635{
1636	unsigned int ret = 0;
1637
1638	if (to_o2hb_region(item)->hr_bdev)
1639		ret = sprintf(page, "%pg\n", to_o2hb_region(item)->hr_bdev);
1640
1641	return ret;
1642}
1643
1644static void o2hb_init_region_params(struct o2hb_region *reg)
1645{
1646	reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1647	reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1648
1649	mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1650	     reg->hr_start_block, reg->hr_blocks);
1651	mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1652	     reg->hr_block_bytes, reg->hr_block_bits);
1653	mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1654	mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1655}
1656
1657static int o2hb_map_slot_data(struct o2hb_region *reg)
1658{
1659	int i, j;
1660	unsigned int last_slot;
1661	unsigned int spp = reg->hr_slots_per_page;
1662	struct page *page;
1663	char *raw;
1664	struct o2hb_disk_slot *slot;
1665
1666	reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1667	if (reg->hr_tmp_block == NULL)
1668		return -ENOMEM;
1669
1670	reg->hr_slots = kcalloc(reg->hr_blocks,
1671				sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1672	if (reg->hr_slots == NULL)
1673		return -ENOMEM;
1674
1675	for(i = 0; i < reg->hr_blocks; i++) {
1676		slot = &reg->hr_slots[i];
1677		slot->ds_node_num = i;
1678		INIT_LIST_HEAD(&slot->ds_live_item);
1679		slot->ds_raw_block = NULL;
1680	}
1681
1682	reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1683	mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1684			   "at %u blocks per page\n",
1685	     reg->hr_num_pages, reg->hr_blocks, spp);
1686
1687	reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1688				    GFP_KERNEL);
1689	if (!reg->hr_slot_data)
1690		return -ENOMEM;
1691
1692	for(i = 0; i < reg->hr_num_pages; i++) {
1693		page = alloc_page(GFP_KERNEL);
1694		if (!page)
1695			return -ENOMEM;
1696
1697		reg->hr_slot_data[i] = page;
1698
1699		last_slot = i * spp;
1700		raw = page_address(page);
1701		for (j = 0;
1702		     (j < spp) && ((j + last_slot) < reg->hr_blocks);
1703		     j++) {
1704			BUG_ON((j + last_slot) >= reg->hr_blocks);
1705
1706			slot = &reg->hr_slots[j + last_slot];
1707			slot->ds_raw_block =
1708				(struct o2hb_disk_heartbeat_block *) raw;
1709
1710			raw += reg->hr_block_bytes;
1711		}
1712	}
1713
1714	return 0;
1715}
1716
1717/* Read in all the slots available and populate the tracking
1718 * structures so that we can start with a baseline idea of what's
1719 * there. */
1720static int o2hb_populate_slot_data(struct o2hb_region *reg)
1721{
1722	int ret, i;
1723	struct o2hb_disk_slot *slot;
1724	struct o2hb_disk_heartbeat_block *hb_block;
1725
1726	ret = o2hb_read_slots(reg, 0, reg->hr_blocks);
1727	if (ret)
1728		goto out;
1729
1730	/* We only want to get an idea of the values initially in each
1731	 * slot, so we do no verification - o2hb_check_slot will
1732	 * actually determine if each configured slot is valid and
1733	 * whether any values have changed. */
1734	for(i = 0; i < reg->hr_blocks; i++) {
1735		slot = &reg->hr_slots[i];
1736		hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1737
1738		/* Only fill the values that o2hb_check_slot uses to
1739		 * determine changing slots */
1740		slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1741		slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1742	}
1743
1744out:
1745	return ret;
1746}
1747
1748/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1749static ssize_t o2hb_region_dev_store(struct config_item *item,
1750				     const char *page,
1751				     size_t count)
1752{
1753	struct o2hb_region *reg = to_o2hb_region(item);
1754	struct task_struct *hb_task;
1755	long fd;
1756	int sectsize;
1757	char *p = (char *)page;
1758	struct fd f;
1759	ssize_t ret = -EINVAL;
1760	int live_threshold;
1761
1762	if (reg->hr_bdev)
1763		goto out;
1764
1765	/* We can't heartbeat without having had our node number
1766	 * configured yet. */
1767	if (o2nm_this_node() == O2NM_MAX_NODES)
1768		goto out;
1769
1770	fd = simple_strtol(p, &p, 0);
1771	if (!p || (*p && (*p != '\n')))
1772		goto out;
1773
1774	if (fd < 0 || fd >= INT_MAX)
1775		goto out;
1776
1777	f = fdget(fd);
1778	if (f.file == NULL)
1779		goto out;
1780
1781	if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1782	    reg->hr_block_bytes == 0)
1783		goto out2;
1784
1785	if (!S_ISBLK(f.file->f_mapping->host->i_mode))
1786		goto out2;
1787
1788	reg->hr_bdev = blkdev_get_by_dev(f.file->f_mapping->host->i_rdev,
1789					 BLK_OPEN_WRITE | BLK_OPEN_READ, NULL,
1790					 NULL);
1791	if (IS_ERR(reg->hr_bdev)) {
1792		ret = PTR_ERR(reg->hr_bdev);
1793		reg->hr_bdev = NULL;
1794		goto out2;
1795	}
1796
1797	sectsize = bdev_logical_block_size(reg->hr_bdev);
1798	if (sectsize != reg->hr_block_bytes) {
1799		mlog(ML_ERROR,
1800		     "blocksize %u incorrect for device, expected %d",
1801		     reg->hr_block_bytes, sectsize);
1802		ret = -EINVAL;
1803		goto out3;
1804	}
1805
1806	o2hb_init_region_params(reg);
1807
1808	/* Generation of zero is invalid */
1809	do {
1810		get_random_bytes(&reg->hr_generation,
1811				 sizeof(reg->hr_generation));
1812	} while (reg->hr_generation == 0);
1813
1814	ret = o2hb_map_slot_data(reg);
1815	if (ret) {
1816		mlog_errno(ret);
1817		goto out3;
1818	}
1819
1820	ret = o2hb_populate_slot_data(reg);
1821	if (ret) {
1822		mlog_errno(ret);
1823		goto out3;
1824	}
1825
1826	INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1827	INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1828
1829	/*
1830	 * A node is considered live after it has beat LIVE_THRESHOLD
1831	 * times.  We're not steady until we've given them a chance
1832	 * _after_ our first read.
1833	 * The default threshold is bare minimum so as to limit the delay
1834	 * during mounts. For global heartbeat, the threshold doubled for the
1835	 * first region.
1836	 */
1837	live_threshold = O2HB_LIVE_THRESHOLD;
1838	if (o2hb_global_heartbeat_active()) {
1839		spin_lock(&o2hb_live_lock);
1840		if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1841			live_threshold <<= 1;
1842		spin_unlock(&o2hb_live_lock);
1843	}
1844	++live_threshold;
1845	atomic_set(&reg->hr_steady_iterations, live_threshold);
1846	/* unsteady_iterations is triple the steady_iterations */
1847	atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1848
1849	hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1850			      reg->hr_item.ci_name);
1851	if (IS_ERR(hb_task)) {
1852		ret = PTR_ERR(hb_task);
1853		mlog_errno(ret);
1854		goto out3;
1855	}
1856
1857	spin_lock(&o2hb_live_lock);
1858	reg->hr_task = hb_task;
1859	spin_unlock(&o2hb_live_lock);
1860
1861	ret = wait_event_interruptible(o2hb_steady_queue,
1862				atomic_read(&reg->hr_steady_iterations) == 0 ||
1863				reg->hr_node_deleted);
1864	if (ret) {
1865		atomic_set(&reg->hr_steady_iterations, 0);
1866		reg->hr_aborted_start = 1;
1867	}
1868
1869	if (reg->hr_aborted_start) {
1870		ret = -EIO;
1871		goto out3;
1872	}
1873
1874	if (reg->hr_node_deleted) {
1875		ret = -EINVAL;
1876		goto out3;
1877	}
1878
1879	/* Ok, we were woken.  Make sure it wasn't by drop_item() */
1880	spin_lock(&o2hb_live_lock);
1881	hb_task = reg->hr_task;
1882	if (o2hb_global_heartbeat_active())
1883		set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1884	spin_unlock(&o2hb_live_lock);
1885
1886	if (hb_task)
1887		ret = count;
1888	else
1889		ret = -EIO;
1890
1891	if (hb_task && o2hb_global_heartbeat_active())
1892		printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%pg)\n",
1893		       config_item_name(&reg->hr_item), reg->hr_bdev);
1894
1895out3:
1896	if (ret < 0) {
1897		blkdev_put(reg->hr_bdev, NULL);
1898		reg->hr_bdev = NULL;
1899	}
1900out2:
1901	fdput(f);
1902out:
1903	return ret;
1904}
1905
1906static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1907{
1908	struct o2hb_region *reg = to_o2hb_region(item);
1909	pid_t pid = 0;
1910
1911	spin_lock(&o2hb_live_lock);
1912	if (reg->hr_task)
1913		pid = task_pid_nr(reg->hr_task);
1914	spin_unlock(&o2hb_live_lock);
1915
1916	if (!pid)
1917		return 0;
1918
1919	return sprintf(page, "%u\n", pid);
1920}
1921
1922CONFIGFS_ATTR(o2hb_region_, block_bytes);
1923CONFIGFS_ATTR(o2hb_region_, start_block);
1924CONFIGFS_ATTR(o2hb_region_, blocks);
1925CONFIGFS_ATTR(o2hb_region_, dev);
1926CONFIGFS_ATTR_RO(o2hb_region_, pid);
1927
1928static struct configfs_attribute *o2hb_region_attrs[] = {
1929	&o2hb_region_attr_block_bytes,
1930	&o2hb_region_attr_start_block,
1931	&o2hb_region_attr_blocks,
1932	&o2hb_region_attr_dev,
1933	&o2hb_region_attr_pid,
1934	NULL,
1935};
1936
1937static struct configfs_item_operations o2hb_region_item_ops = {
1938	.release		= o2hb_region_release,
1939};
1940
1941static const struct config_item_type o2hb_region_type = {
1942	.ct_item_ops	= &o2hb_region_item_ops,
1943	.ct_attrs	= o2hb_region_attrs,
1944	.ct_owner	= THIS_MODULE,
1945};
1946
1947/* heartbeat set */
1948
1949struct o2hb_heartbeat_group {
1950	struct config_group hs_group;
1951	/* some stuff? */
1952};
1953
1954static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1955{
1956	return group ?
1957		container_of(group, struct o2hb_heartbeat_group, hs_group)
1958		: NULL;
1959}
1960
1961static void o2hb_debug_region_init(struct o2hb_region *reg,
1962				   struct dentry *parent)
1963{
1964	struct dentry *dir;
1965
1966	dir = debugfs_create_dir(config_item_name(&reg->hr_item), parent);
1967	reg->hr_debug_dir = dir;
1968
1969	o2hb_debug_create(O2HB_DEBUG_LIVENODES, dir, &(reg->hr_db_livenodes),
1970			  sizeof(*(reg->hr_db_livenodes)),
1971			  O2HB_DB_TYPE_REGION_LIVENODES,
1972			  sizeof(reg->hr_live_node_bitmap), O2NM_MAX_NODES,
1973			  reg);
1974
1975	o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, dir, &(reg->hr_db_regnum),
1976			  sizeof(*(reg->hr_db_regnum)),
1977			  O2HB_DB_TYPE_REGION_NUMBER, 0, O2NM_MAX_NODES, reg);
1978
1979	o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, dir,
1980			  &(reg->hr_db_elapsed_time),
1981			  sizeof(*(reg->hr_db_elapsed_time)),
1982			  O2HB_DB_TYPE_REGION_ELAPSED_TIME, 0, 0, reg);
1983
1984	o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, dir, &(reg->hr_db_pinned),
1985			  sizeof(*(reg->hr_db_pinned)),
1986			  O2HB_DB_TYPE_REGION_PINNED, 0, 0, reg);
1987
1988}
1989
1990static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1991							  const char *name)
1992{
1993	struct o2hb_region *reg = NULL;
1994	int ret;
1995
1996	reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
1997	if (reg == NULL)
1998		return ERR_PTR(-ENOMEM);
1999
2000	if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2001		ret = -ENAMETOOLONG;
2002		goto free;
2003	}
2004
2005	spin_lock(&o2hb_live_lock);
2006	reg->hr_region_num = 0;
2007	if (o2hb_global_heartbeat_active()) {
2008		reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2009							 O2NM_MAX_REGIONS);
2010		if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2011			spin_unlock(&o2hb_live_lock);
2012			ret = -EFBIG;
2013			goto free;
2014		}
2015		set_bit(reg->hr_region_num, o2hb_region_bitmap);
2016	}
2017	list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2018	spin_unlock(&o2hb_live_lock);
2019
2020	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2021
2022	/* this is the same way to generate msg key as dlm, for local heartbeat,
2023	 * name is also the same, so make initial crc value different to avoid
2024	 * message key conflict.
2025	 */
2026	reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2027		name, strlen(name));
2028	INIT_LIST_HEAD(&reg->hr_handler_list);
2029	ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2030			sizeof(struct o2hb_nego_msg),
2031			o2hb_nego_timeout_handler,
2032			reg, NULL, &reg->hr_handler_list);
2033	if (ret)
2034		goto remove_item;
2035
2036	ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2037			sizeof(struct o2hb_nego_msg),
2038			o2hb_nego_approve_handler,
2039			reg, NULL, &reg->hr_handler_list);
2040	if (ret)
2041		goto unregister_handler;
2042
2043	o2hb_debug_region_init(reg, o2hb_debug_dir);
2044
2045	return &reg->hr_item;
2046
2047unregister_handler:
2048	o2net_unregister_handler_list(&reg->hr_handler_list);
2049remove_item:
2050	spin_lock(&o2hb_live_lock);
2051	list_del(&reg->hr_all_item);
2052	if (o2hb_global_heartbeat_active())
2053		clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2054	spin_unlock(&o2hb_live_lock);
2055free:
2056	kfree(reg);
2057	return ERR_PTR(ret);
2058}
2059
2060static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2061					   struct config_item *item)
2062{
2063	struct task_struct *hb_task;
2064	struct o2hb_region *reg = to_o2hb_region(item);
2065	int quorum_region = 0;
2066
2067	/* stop the thread when the user removes the region dir */
2068	spin_lock(&o2hb_live_lock);
2069	hb_task = reg->hr_task;
2070	reg->hr_task = NULL;
2071	reg->hr_item_dropped = 1;
2072	spin_unlock(&o2hb_live_lock);
2073
2074	if (hb_task)
2075		kthread_stop(hb_task);
2076
2077	if (o2hb_global_heartbeat_active()) {
2078		spin_lock(&o2hb_live_lock);
2079		clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2080		clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2081		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2082			quorum_region = 1;
2083		clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2084		spin_unlock(&o2hb_live_lock);
2085		printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%pg)\n",
2086		       ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2087			"stopped" : "start aborted"), config_item_name(item),
2088		       reg->hr_bdev);
2089	}
2090
2091	/*
2092	 * If we're racing a dev_write(), we need to wake them.  They will
2093	 * check reg->hr_task
2094	 */
2095	if (atomic_read(&reg->hr_steady_iterations) != 0) {
2096		reg->hr_aborted_start = 1;
2097		atomic_set(&reg->hr_steady_iterations, 0);
2098		wake_up(&o2hb_steady_queue);
2099	}
2100
2101	config_item_put(item);
2102
2103	if (!o2hb_global_heartbeat_active() || !quorum_region)
2104		return;
2105
2106	/*
2107	 * If global heartbeat active and there are dependent users,
2108	 * pin all regions if quorum region count <= CUT_OFF
2109	 */
2110	spin_lock(&o2hb_live_lock);
2111
2112	if (!o2hb_dependent_users)
2113		goto unlock;
2114
2115	if (bitmap_weight(o2hb_quorum_region_bitmap,
2116			   O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2117		o2hb_region_pin(NULL);
2118
2119unlock:
2120	spin_unlock(&o2hb_live_lock);
2121}
2122
2123static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item,
2124		char *page)
2125{
2126	return sprintf(page, "%u\n", o2hb_dead_threshold);
2127}
2128
2129static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item,
2130		const char *page, size_t count)
2131{
2132	unsigned long tmp;
2133	char *p = (char *)page;
2134
2135	tmp = simple_strtoul(p, &p, 10);
2136	if (!p || (*p && (*p != '\n')))
2137                return -EINVAL;
2138
2139	/* this will validate ranges for us. */
2140	o2hb_dead_threshold_set((unsigned int) tmp);
2141
2142	return count;
2143}
2144
2145static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2146		char *page)
2147{
2148	return sprintf(page, "%s\n",
2149		       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2150}
2151
2152static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2153		const char *page, size_t count)
2154{
2155	unsigned int i;
2156	int ret;
2157	size_t len;
2158
2159	len = (page[count - 1] == '\n') ? count - 1 : count;
2160	if (!len)
2161		return -EINVAL;
2162
2163	for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2164		if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2165			continue;
2166
2167		ret = o2hb_global_heartbeat_mode_set(i);
2168		if (!ret)
2169			printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2170			       o2hb_heartbeat_mode_desc[i]);
2171		return count;
2172	}
2173
2174	return -EINVAL;
2175
2176}
2177
2178CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold);
2179CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2180
2181static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2182	&o2hb_heartbeat_group_attr_dead_threshold,
2183	&o2hb_heartbeat_group_attr_mode,
2184	NULL,
2185};
2186
2187static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2188	.make_item	= o2hb_heartbeat_group_make_item,
2189	.drop_item	= o2hb_heartbeat_group_drop_item,
2190};
2191
2192static const struct config_item_type o2hb_heartbeat_group_type = {
2193	.ct_group_ops	= &o2hb_heartbeat_group_group_ops,
2194	.ct_attrs	= o2hb_heartbeat_group_attrs,
2195	.ct_owner	= THIS_MODULE,
2196};
2197
2198/* this is just here to avoid touching group in heartbeat.h which the
2199 * entire damn world #includes */
2200struct config_group *o2hb_alloc_hb_set(void)
2201{
2202	struct o2hb_heartbeat_group *hs = NULL;
2203	struct config_group *ret = NULL;
2204
2205	hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2206	if (hs == NULL)
2207		goto out;
2208
2209	config_group_init_type_name(&hs->hs_group, "heartbeat",
2210				    &o2hb_heartbeat_group_type);
2211
2212	ret = &hs->hs_group;
2213out:
2214	if (ret == NULL)
2215		kfree(hs);
2216	return ret;
2217}
2218
2219void o2hb_free_hb_set(struct config_group *group)
2220{
2221	struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2222	kfree(hs);
2223}
2224
2225/* hb callback registration and issuing */
2226
2227static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2228{
2229	if (type == O2HB_NUM_CB)
2230		return ERR_PTR(-EINVAL);
2231
2232	return &o2hb_callbacks[type];
2233}
2234
2235void o2hb_setup_callback(struct o2hb_callback_func *hc,
2236			 enum o2hb_callback_type type,
2237			 o2hb_cb_func *func,
2238			 void *data,
2239			 int priority)
2240{
2241	INIT_LIST_HEAD(&hc->hc_item);
2242	hc->hc_func = func;
2243	hc->hc_data = data;
2244	hc->hc_priority = priority;
2245	hc->hc_type = type;
2246	hc->hc_magic = O2HB_CB_MAGIC;
2247}
2248EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2249
2250/*
2251 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2252 * In global heartbeat mode, region_uuid passed is NULL.
2253 *
2254 * In local, we only pin the matching region. In global we pin all the active
2255 * regions.
2256 */
2257static int o2hb_region_pin(const char *region_uuid)
2258{
2259	int ret = 0, found = 0;
2260	struct o2hb_region *reg;
2261	char *uuid;
2262
2263	assert_spin_locked(&o2hb_live_lock);
2264
2265	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2266		if (reg->hr_item_dropped)
2267			continue;
2268
2269		uuid = config_item_name(&reg->hr_item);
2270
2271		/* local heartbeat */
2272		if (region_uuid) {
2273			if (strcmp(region_uuid, uuid))
2274				continue;
2275			found = 1;
2276		}
2277
2278		if (reg->hr_item_pinned || reg->hr_item_dropped)
2279			goto skip_pin;
2280
2281		/* Ignore ENOENT only for local hb (userdlm domain) */
2282		ret = o2nm_depend_item(&reg->hr_item);
2283		if (!ret) {
2284			mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2285			reg->hr_item_pinned = 1;
2286		} else {
2287			if (ret == -ENOENT && found)
2288				ret = 0;
2289			else {
2290				mlog(ML_ERROR, "Pin region %s fails with %d\n",
2291				     uuid, ret);
2292				break;
2293			}
2294		}
2295skip_pin:
2296		if (found)
2297			break;
2298	}
2299
2300	return ret;
2301}
2302
2303/*
2304 * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2305 * In global heartbeat mode, region_uuid passed is NULL.
2306 *
2307 * In local, we only unpin the matching region. In global we unpin all the
2308 * active regions.
2309 */
2310static void o2hb_region_unpin(const char *region_uuid)
2311{
2312	struct o2hb_region *reg;
2313	char *uuid;
2314	int found = 0;
2315
2316	assert_spin_locked(&o2hb_live_lock);
2317
2318	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2319		if (reg->hr_item_dropped)
2320			continue;
2321
2322		uuid = config_item_name(&reg->hr_item);
2323		if (region_uuid) {
2324			if (strcmp(region_uuid, uuid))
2325				continue;
2326			found = 1;
2327		}
2328
2329		if (reg->hr_item_pinned) {
2330			mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2331			o2nm_undepend_item(&reg->hr_item);
2332			reg->hr_item_pinned = 0;
2333		}
2334		if (found)
2335			break;
2336	}
2337}
2338
2339static int o2hb_region_inc_user(const char *region_uuid)
2340{
2341	int ret = 0;
2342
2343	spin_lock(&o2hb_live_lock);
2344
2345	/* local heartbeat */
2346	if (!o2hb_global_heartbeat_active()) {
2347	    ret = o2hb_region_pin(region_uuid);
2348	    goto unlock;
2349	}
2350
2351	/*
2352	 * if global heartbeat active and this is the first dependent user,
2353	 * pin all regions if quorum region count <= CUT_OFF
2354	 */
2355	o2hb_dependent_users++;
2356	if (o2hb_dependent_users > 1)
2357		goto unlock;
2358
2359	if (bitmap_weight(o2hb_quorum_region_bitmap,
2360			   O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2361		ret = o2hb_region_pin(NULL);
2362
2363unlock:
2364	spin_unlock(&o2hb_live_lock);
2365	return ret;
2366}
2367
2368static void o2hb_region_dec_user(const char *region_uuid)
2369{
2370	spin_lock(&o2hb_live_lock);
2371
2372	/* local heartbeat */
2373	if (!o2hb_global_heartbeat_active()) {
2374	    o2hb_region_unpin(region_uuid);
2375	    goto unlock;
2376	}
2377
2378	/*
2379	 * if global heartbeat active and there are no dependent users,
2380	 * unpin all quorum regions
2381	 */
2382	o2hb_dependent_users--;
2383	if (!o2hb_dependent_users)
2384		o2hb_region_unpin(NULL);
2385
2386unlock:
2387	spin_unlock(&o2hb_live_lock);
2388}
2389
2390int o2hb_register_callback(const char *region_uuid,
2391			   struct o2hb_callback_func *hc)
2392{
2393	struct o2hb_callback_func *f;
2394	struct o2hb_callback *hbcall;
2395	int ret;
2396
2397	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2398	BUG_ON(!list_empty(&hc->hc_item));
2399
2400	hbcall = hbcall_from_type(hc->hc_type);
2401	if (IS_ERR(hbcall)) {
2402		ret = PTR_ERR(hbcall);
2403		goto out;
2404	}
2405
2406	if (region_uuid) {
2407		ret = o2hb_region_inc_user(region_uuid);
2408		if (ret) {
2409			mlog_errno(ret);
2410			goto out;
2411		}
2412	}
2413
2414	down_write(&o2hb_callback_sem);
2415
2416	list_for_each_entry(f, &hbcall->list, hc_item) {
2417		if (hc->hc_priority < f->hc_priority) {
2418			list_add_tail(&hc->hc_item, &f->hc_item);
2419			break;
2420		}
2421	}
2422	if (list_empty(&hc->hc_item))
2423		list_add_tail(&hc->hc_item, &hbcall->list);
2424
2425	up_write(&o2hb_callback_sem);
2426	ret = 0;
2427out:
2428	mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2429	     ret, __builtin_return_address(0), hc);
2430	return ret;
2431}
2432EXPORT_SYMBOL_GPL(o2hb_register_callback);
2433
2434void o2hb_unregister_callback(const char *region_uuid,
2435			      struct o2hb_callback_func *hc)
2436{
2437	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2438
2439	mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2440	     __builtin_return_address(0), hc);
2441
2442	/* XXX Can this happen _with_ a region reference? */
2443	if (list_empty(&hc->hc_item))
2444		return;
2445
2446	if (region_uuid)
2447		o2hb_region_dec_user(region_uuid);
2448
2449	down_write(&o2hb_callback_sem);
2450
2451	list_del_init(&hc->hc_item);
2452
2453	up_write(&o2hb_callback_sem);
2454}
2455EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2456
2457int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2458{
2459	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2460
2461	spin_lock(&o2hb_live_lock);
2462	o2hb_fill_node_map_from_callback(testing_map, O2NM_MAX_NODES);
2463	spin_unlock(&o2hb_live_lock);
2464	if (!test_bit(node_num, testing_map)) {
2465		mlog(ML_HEARTBEAT,
2466		     "node (%u) does not have heartbeating enabled.\n",
2467		     node_num);
2468		return 0;
2469	}
2470
2471	return 1;
2472}
2473EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2474
2475int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2476{
2477	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2478
2479	o2hb_fill_node_map_from_callback(testing_map, O2NM_MAX_NODES);
2480	if (!test_bit(node_num, testing_map)) {
2481		mlog(ML_HEARTBEAT,
2482		     "node (%u) does not have heartbeating enabled.\n",
2483		     node_num);
2484		return 0;
2485	}
2486
2487	return 1;
2488}
2489EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2490
2491/*
2492 * this is just a hack until we get the plumbing which flips file systems
2493 * read only and drops the hb ref instead of killing the node dead.
2494 */
2495void o2hb_stop_all_regions(void)
2496{
2497	struct o2hb_region *reg;
2498
2499	mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2500
2501	spin_lock(&o2hb_live_lock);
2502
2503	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2504		reg->hr_unclean_stop = 1;
2505
2506	spin_unlock(&o2hb_live_lock);
2507}
2508EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2509
2510int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2511{
2512	struct o2hb_region *reg;
2513	int numregs = 0;
2514	char *p;
2515
2516	spin_lock(&o2hb_live_lock);
2517
2518	p = region_uuids;
2519	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2520		if (reg->hr_item_dropped)
2521			continue;
2522
2523		mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2524		if (numregs < max_regions) {
2525			memcpy(p, config_item_name(&reg->hr_item),
2526			       O2HB_MAX_REGION_NAME_LEN);
2527			p += O2HB_MAX_REGION_NAME_LEN;
2528		}
2529		numregs++;
2530	}
2531
2532	spin_unlock(&o2hb_live_lock);
2533
2534	return numregs;
2535}
2536EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2537
2538int o2hb_global_heartbeat_active(void)
2539{
2540	return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2541}
2542EXPORT_SYMBOL(o2hb_global_heartbeat_active);
2543