Lines Matching defs:dlm

38 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
41 static int dlm_do_recovery(struct dlm_ctxt *dlm);
43 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
44 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node);
45 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);
46 static int dlm_request_all_locks(struct dlm_ctxt *dlm,
48 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm);
55 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
60 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
63 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
64 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
66 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node);
67 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
69 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
77 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
100 static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
103 assert_spin_locked(&dlm->spinlock);
104 if (dlm->reco.dead_node != dead_node)
106 dlm->name, dlm->reco.dead_node, dead_node);
107 dlm->reco.dead_node = dead_node;
110 static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
113 assert_spin_locked(&dlm->spinlock);
115 dlm->name, dlm->reco.new_master, master);
116 dlm->reco.new_master = master;
119 static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
121 assert_spin_locked(&dlm->spinlock);
122 clear_bit(dlm->reco.dead_node, dlm->recovery_map);
123 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
124 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
130 struct dlm_ctxt *dlm =
137 spin_lock(&dlm->work_lock);
138 list_splice_init(&dlm->work_list, &tmp_list);
139 spin_unlock(&dlm->work_lock);
144 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
150 /* already have ref on dlm to avoid having
152 BUG_ON(item->dlm != dlm);
158 dlm_put(dlm);
167 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
175 wake_up(&dlm->dlm_reco_thread_wq);
179 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm)
181 mlog(0, "starting dlm recovery thread...\n");
183 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm,
184 "dlm_reco-%s", dlm->name);
185 if (IS_ERR(dlm->dlm_reco_thread_task)) {
186 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task));
187 dlm->dlm_reco_thread_task = NULL;
194 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
196 if (dlm->dlm_reco_thread_task) {
197 mlog(0, "waiting for dlm recovery thread to exit\n");
198 kthread_stop(dlm->dlm_reco_thread_task);
199 dlm->dlm_reco_thread_task = NULL;
228 static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
234 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
235 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
236 dlm->reco.dead_node, dlm->reco.new_master);
238 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
267 dlm->name, ndata->node_num, st);
269 list_for_each_entry(res, &dlm->reco.resources, recovering) {
271 dlm->name, res->lockname.len, res->lockname.name);
280 struct dlm_ctxt *dlm = data;
283 mlog(0, "dlm thread running for %s...\n", dlm->name);
286 if (dlm_domain_fully_joined(dlm)) {
287 status = dlm_do_recovery(dlm);
296 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
306 static int dlm_reco_master_ready(struct dlm_ctxt *dlm)
309 spin_lock(&dlm->spinlock);
310 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM);
311 spin_unlock(&dlm->spinlock);
317 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
320 spin_lock(&dlm->spinlock);
321 dead = !test_bit(node, dlm->domain_map);
322 spin_unlock(&dlm->spinlock);
328 static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
331 spin_lock(&dlm->spinlock);
332 recovered = !test_bit(node, dlm->recovery_map);
333 spin_unlock(&dlm->spinlock);
338 void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
340 if (dlm_is_node_dead(dlm, node))
344 "domain %s\n", node, dlm->name);
347 wait_event_timeout(dlm->dlm_reco_thread_wq,
348 dlm_is_node_dead(dlm, node),
351 wait_event(dlm->dlm_reco_thread_wq,
352 dlm_is_node_dead(dlm, node));
355 void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
357 if (dlm_is_node_recovered(dlm, node))
361 "domain %s\n", node, dlm->name);
364 wait_event_timeout(dlm->dlm_reco_thread_wq,
365 dlm_is_node_recovered(dlm, node),
368 wait_event(dlm->dlm_reco_thread_wq,
369 dlm_is_node_recovered(dlm, node));
373 * block on the dlm->reco.event when recovery is in progress.
374 * the dlm recovery thread will set this state when it begins
378 static int dlm_in_recovery(struct dlm_ctxt *dlm)
381 spin_lock(&dlm->spinlock);
382 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
383 spin_unlock(&dlm->spinlock);
388 void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
390 if (dlm_in_recovery(dlm)) {
393 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
394 dlm->reco.state, dlm->reco.new_master,
395 dlm->reco.dead_node);
397 wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
400 static void dlm_begin_recovery(struct dlm_ctxt *dlm)
402 assert_spin_locked(&dlm->spinlock);
403 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE);
405 dlm->name, dlm->reco.dead_node);
406 dlm->reco.state |= DLM_RECO_STATE_ACTIVE;
409 static void dlm_end_recovery(struct dlm_ctxt *dlm)
411 spin_lock(&dlm->spinlock);
412 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));
413 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;
414 spin_unlock(&dlm->spinlock);
415 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name);
416 wake_up(&dlm->reco.event);
419 static void dlm_print_recovery_master(struct dlm_ctxt *dlm)
422 "dead node %u in domain %s\n", dlm->reco.new_master,
423 (dlm->node_num == dlm->reco.new_master ? "me" : "he"),
424 dlm->reco.dead_node, dlm->name);
427 static int dlm_do_recovery(struct dlm_ctxt *dlm)
432 spin_lock(&dlm->spinlock);
434 if (dlm->migrate_done) {
436 "lock resources\n", dlm->name);
437 spin_unlock(&dlm->spinlock);
442 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM &&
443 test_bit(dlm->reco.new_master, dlm->recovery_map)) {
445 dlm->reco.new_master, dlm->reco.dead_node);
447 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
451 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
454 bit = find_first_bit(dlm->recovery_map, O2NM_MAX_NODES);
456 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
458 dlm_set_reco_dead_node(dlm, bit);
459 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
462 dlm->reco.dead_node);
463 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
466 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
468 spin_unlock(&dlm->spinlock);
473 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task),
474 dlm->reco.dead_node);
478 dlm_begin_recovery(dlm);
480 spin_unlock(&dlm->spinlock);
482 if (dlm->reco.new_master == dlm->node_num)
485 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
490 ret = dlm_pick_recovery_master(dlm);
498 dlm_print_recovery_master(dlm);
503 dlm_end_recovery(dlm);
509 dlm_print_recovery_master(dlm);
511 status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
515 "retrying.\n", dlm->name, status, dlm->reco.dead_node);
522 dlm->name, dlm->reco.dead_node, dlm->node_num);
523 spin_lock(&dlm->spinlock);
524 __dlm_reset_recovery(dlm);
525 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
526 spin_unlock(&dlm->spinlock);
528 dlm_end_recovery(dlm);
534 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
545 status = dlm_init_recovery_area(dlm, dead_node);
548 "retrying\n", dlm->name);
555 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
559 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name,
562 if (ndata->node_num == dlm->node_num) {
568 status = dlm_request_all_locks(dlm, ndata->node_num,
578 wait_event_timeout(dlm->dlm_reco_thread_wq,
579 dlm_is_node_dead(dlm,
584 str_yes_no(dlm_is_node_dead(dlm, ndata->node_num)));
590 dlm->name, ndata->node_num,
631 mlog(0, "%s: Done requesting all lock info\n", dlm->name);
641 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
661 dlm->name, ndata->node_num,
668 dlm->name, ndata->node_num);
672 dlm->name, ndata->node_num);
687 spin_lock(&dlm->spinlock);
688 dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
689 spin_unlock(&dlm->spinlock);
695 ret = dlm_send_finalize_reco_message(dlm);
699 spin_lock(&dlm->spinlock);
700 dlm_finish_local_lockres_recovery(dlm, dead_node,
701 dlm->node_num);
702 spin_unlock(&dlm->spinlock);
706 "dead=%u, this=%u, new=%u\n", dlm->name,
707 jiffies, dlm->reco.dead_node,
708 dlm->node_num, dlm->reco.new_master);
712 dlm_kick_thread(dlm, NULL);
717 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,
724 dlm_destroy_recovery_area(dlm);
729 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
734 spin_lock(&dlm->spinlock);
735 bitmap_copy(dlm->reco.node_map, dlm->domain_map, O2NM_MAX_NODES);
738 spin_unlock(&dlm->spinlock);
741 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num);
749 dlm_destroy_recovery_area(dlm);
755 list_add_tail(&ndata->list, &dlm->reco.node_data);
763 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm)
769 list_splice_init(&dlm->reco.node_data, &tmplist);
778 static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,
792 lr.node_idx = dlm->node_num;
796 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key,
802 "to recover dead node %u\n", dlm->name, ret,
815 struct dlm_ctxt *dlm = data;
820 if (!dlm_grab(dlm))
823 if (lr->dead_node != dlm->reco.dead_node) {
825 "dead_node is %u\n", dlm->name, lr->node_idx,
826 lr->dead_node, dlm->reco.dead_node);
827 dlm_print_reco_node_status(dlm);
829 dlm_put(dlm);
832 BUG_ON(lr->dead_node != dlm->reco.dead_node);
836 dlm_put(dlm);
844 dlm_put(dlm);
849 dlm_grab(dlm); /* get an extra ref for the work item */
850 dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf);
853 spin_lock(&dlm->work_lock);
854 list_add_tail(&item->list, &dlm->work_list);
855 spin_unlock(&dlm->work_lock);
856 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
858 dlm_put(dlm);
866 struct dlm_ctxt *dlm;
872 dlm = item->dlm;
878 dlm->name, dead_node, reco_master);
880 if (dead_node != dlm->reco.dead_node ||
881 reco_master != dlm->reco.new_master) {
884 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
887 " current=(dead=%u,mas=%u)\n", dlm->name,
889 dlm->reco.dead_node, dlm->reco.new_master);
893 dlm->name, dlm->reco.dead_node,
894 dlm->reco.new_master, dead_node, reco_master);
900 * dlm->reco.resources list. now move items from that list
905 dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
907 /* now we can begin blasting lockreses without the dlm lock */
912 ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
916 "recovery state for dead node %u, ret=%d\n", dlm->name,
924 spin_lock(&dlm->spinlock);
925 list_splice_init(&resources, &dlm->reco.resources);
926 spin_unlock(&dlm->spinlock);
929 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
933 dlm->name, reco_master, dead_node, ret);
941 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
947 done_msg.node_idx = dlm->node_num;
953 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
957 "to recover dead node %u\n", dlm->name, ret, send_to,
971 struct dlm_ctxt *dlm = data;
976 if (!dlm_grab(dlm))
981 dlm->reco.dead_node, done->node_idx, dlm->node_num);
983 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
986 dlm->reco.dead_node, done->node_idx, dlm->node_num);
989 list_for_each_entry(ndata, &dlm->reco.node_data, list) {
1022 dlm_kick_recovery_thread(dlm);
1027 dlm_put(dlm);
1033 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
1040 spin_lock(&dlm->spinlock);
1041 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
1052 dead_node, dlm->name);
1076 spin_unlock(&dlm->spinlock);
1094 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
1117 dlm->name, res->lockname.len, res->lockname.name,
1122 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
1129 "node %u (%s)\n", dlm->name, mres->lockname_len,
1230 static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
1240 dummy.ml.node = dlm->node_num;
1244 static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
1259 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1297 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
1306 dlm->name, res->lockname.len, res->lockname.name,
1309 dlm_add_dummy_lock(dlm, mres);
1312 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
1319 dlm->name, ret);
1323 "lockres %.*s\n", dlm->name, send_to,
1347 struct dlm_ctxt *dlm = data;
1358 if (!dlm_grab(dlm))
1361 if (!dlm_joined(dlm)) {
1364 dlm->name, mres->lockname_len,
1366 dlm_put(dlm);
1394 spin_lock(&dlm->spinlock);
1395 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len,
1404 " ref!\n", dlm->name,
1408 spin_unlock(&dlm->spinlock);
1429 spin_unlock(&dlm->spinlock);
1436 spin_unlock(&dlm->spinlock);
1438 spin_unlock(&dlm->spinlock);
1441 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len);
1455 spin_lock(&dlm->spinlock);
1456 __dlm_insert_lockres(dlm, res);
1457 spin_unlock(&dlm->spinlock);
1490 dlm_lockres_grab_inflight_ref(dlm, res);
1500 dlm_change_lockres_owner(dlm, res, dlm->node_num);
1505 dlm_grab(dlm); /* get an extra ref for the work item */
1507 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf);
1511 spin_lock(&dlm->work_lock);
1512 list_add_tail(&item->list, &dlm->work_list);
1513 spin_unlock(&dlm->work_lock);
1514 queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1521 dlm_put(dlm);
1534 struct dlm_ctxt *dlm;
1541 dlm = item->dlm;
1552 ret = dlm_lockres_master_requery(dlm, res, &real_master);
1564 dlm_lockres_drop_inflight_ref(dlm, res);
1575 ret = dlm_process_recovery_data(dlm, res, mres);
1583 ret = dlm_finish_migration(dlm, res, mres->master);
1600 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
1633 spin_lock(&dlm->spinlock);
1634 dlm_node_iter_init(dlm->domain_map, &iter);
1635 spin_unlock(&dlm->spinlock);
1639 if (nodenum == dlm->node_num)
1641 ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
1658 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1666 req.node_idx = dlm->node_num;
1671 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key,
1676 dlm->key, nodenum);
1699 struct dlm_ctxt *dlm = data;
1707 if (!dlm_grab(dlm)) {
1715 spin_lock(&dlm->spinlock);
1716 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
1720 if (master == dlm->node_num) {
1721 int ret = dlm_dispatch_assert_master(dlm, res,
1727 spin_unlock(&dlm->spinlock);
1728 dlm_put(dlm);
1733 __dlm_lockres_grab_inflight_worker(dlm, res);
1742 spin_unlock(&dlm->spinlock);
1745 dlm_put(dlm);
1786 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
1805 if (dlm_is_dummy_lock(dlm, ml, &from)) {
1809 dlm->name, mres->lockname_len, mres->lockname,
1812 dlm_lockres_set_refmap_bit(dlm, res, from);
1826 if (ml->node == dlm->node_num) {
1954 "lvb! type=%d\n", dlm->name,
1994 "exists on this lockres!\n", dlm->name,
2023 "setting refmap bit\n", dlm->name,
2025 dlm_lockres_set_refmap_bit(dlm, res, ml->node);
2034 dlm_lockres_drop_inflight_ref(dlm, res);
2043 void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
2050 assert_spin_locked(&dlm->spinlock);
2056 dlm->name, res->lockname.len, res->lockname.name);
2062 list_add_tail(&res->recovering, &dlm->reco.resources);
2127 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
2134 assert_spin_locked(&dlm->spinlock);
2136 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {
2139 dlm->name, res->lockname.len, res->lockname.name,
2145 dlm_change_lockres_owner(dlm, res, new_master);
2148 __dlm_dirty_lockres(dlm, res);
2160 bucket = dlm_lockres_hash(dlm, i);
2173 res->owner != dlm->node_num)
2184 dlm->name, res->lockname.len, res->lockname.name,
2187 dlm_change_lockres_owner(dlm, res, new_master);
2190 __dlm_dirty_lockres(dlm, res);
2208 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
2217 assert_spin_locked(&dlm->spinlock);
2220 if (res->owner == dlm->node_num)
2227 search_node = dlm->node_num;
2251 static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
2261 assert_spin_locked(&dlm->spinlock);
2298 "dropping ref from lockres\n", dlm->name,
2302 "but ref was not set\n", dlm->name,
2307 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2310 "no locks and had not purged before dying\n", dlm->name,
2312 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2316 __dlm_dirty_lockres(dlm, res);
2319 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
2329 dlm_clean_master_list(dlm, dead_node);
2341 * dead node. once recovery finishes, the dlm thread
2346 bucket = dlm_lockres_hash(dlm, i);
2358 dead_node, dlm->name);
2372 __dlm_do_purge_lockres(dlm, res);
2377 } else if (res->owner == dlm->node_num)
2378 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2384 dlm_revalidate_lvb(dlm, res, dead_node);
2391 dlm->name, res->lockname.len,
2394 __dlm_do_purge_lockres(dlm, res);
2400 dlm_move_lockres_to_recovery_list(dlm, res);
2401 } else if (res->owner == dlm->node_num) {
2402 dlm_free_dead_locks(dlm, res, dead_node);
2403 __dlm_lockres_calc_usage(dlm, res);
2408 dlm->name, res->lockname.len,
2410 dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
2419 static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
2421 assert_spin_locked(&dlm->spinlock);
2423 if (dlm->reco.new_master == idx) {
2425 dlm->name, idx);
2426 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2431 "finalize1 state, clearing\n", dlm->name, idx);
2432 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2433 __dlm_reset_recovery(dlm);
2438 if (dlm->joining_node == idx) {
2440 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
2444 if (!test_bit(idx, dlm->live_nodes_map)) {
2447 dlm->name, idx);
2452 if (!test_bit(idx, dlm->domain_map)) {
2459 clear_bit(idx, dlm->live_nodes_map);
2462 if (!test_bit(idx, dlm->recovery_map))
2463 dlm_do_local_recovery_cleanup(dlm, idx);
2466 dlm_hb_event_notify_attached(dlm, idx, 0);
2469 clear_bit(idx, dlm->domain_map);
2470 clear_bit(idx, dlm->exit_domain_map);
2473 wake_up(&dlm->migration_wq);
2475 set_bit(idx, dlm->recovery_map);
2480 struct dlm_ctxt *dlm = data;
2482 if (!dlm_grab(dlm))
2486 * This will notify any dlm users that a node in our domain
2489 if (test_bit(idx, dlm->domain_map))
2490 dlm_fire_domain_eviction_callbacks(dlm, idx);
2492 spin_lock(&dlm->spinlock);
2493 __dlm_hb_node_down(dlm, idx);
2494 spin_unlock(&dlm->spinlock);
2496 dlm_put(dlm);
2501 struct dlm_ctxt *dlm = data;
2503 if (!dlm_grab(dlm))
2506 spin_lock(&dlm->spinlock);
2507 set_bit(idx, dlm->live_nodes_map);
2510 spin_unlock(&dlm->spinlock);
2512 dlm_put(dlm);
2517 struct dlm_ctxt *dlm = astdata;
2518 mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n",
2519 dlm->node_num, dlm->name);
2523 struct dlm_ctxt *dlm = astdata;
2524 mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n",
2525 dlm->node_num, dlm->name);
2539 * or b) dlm->reco.new_master gets set to some nodenum
2544 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
2551 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
2555 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
2557 dlm_reco_ast, dlm, dlm_reco_bast);
2560 dlm->name, ret, lksb.status);
2563 mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
2564 dlm->name, dlm->node_num);
2568 if (dlm_reco_master_ready(dlm)) {
2570 "do the recovery\n", dlm->name,
2571 dlm->reco.new_master);
2577 spin_lock(&dlm->spinlock);
2578 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
2581 "node got recovered already\n", dlm->name);
2582 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2585 dlm->name, dlm->reco.new_master);
2589 spin_unlock(&dlm->spinlock);
2596 "begin_reco now\n", dlm->name,
2597 dlm->reco.dead_node, dlm->node_num);
2598 status = dlm_send_begin_reco_message(dlm,
2599 dlm->reco.dead_node);
2604 spin_lock(&dlm->spinlock);
2605 dlm_set_reco_master(dlm, dlm->node_num);
2606 spin_unlock(&dlm->spinlock);
2611 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
2614 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);
2626 mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
2627 dlm->name, dlm->node_num);
2631 wait_event_timeout(dlm->dlm_reco_thread_wq,
2632 dlm_reco_master_ready(dlm),
2634 if (!dlm_reco_master_ready(dlm)) {
2636 dlm->name);
2641 dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
2644 mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
2645 dlm->name, dlm->node_num);
2652 "lksb.status=%s\n", dlm->name, dlm_errname(ret),
2654 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
2668 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
2676 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
2678 spin_lock(&dlm->spinlock);
2679 dlm_node_iter_init(dlm->domain_map, &iter);
2680 spin_unlock(&dlm->spinlock);
2685 br.node_idx = dlm->node_num;
2695 if (nodenum == dlm->node_num) {
2702 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key,
2711 "begin reco msg (%d)\n", dlm->name, nodenum, ret);
2723 "to complete, backoff for a bit\n", dlm->name,
2734 mlog(ML_ERROR, "begin reco of dlm %s to node %u "
2735 "returned %d\n", dlm->name, nodenum, ret);
2736 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
2757 struct dlm_ctxt *dlm = data;
2761 if (!dlm_grab(dlm))
2764 spin_lock(&dlm->spinlock);
2765 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2768 dlm->name, br->node_idx, br->dead_node,
2769 dlm->reco.dead_node, dlm->reco.new_master);
2770 spin_unlock(&dlm->spinlock);
2771 dlm_put(dlm);
2774 spin_unlock(&dlm->spinlock);
2777 dlm->name, br->node_idx, br->dead_node,
2778 dlm->reco.dead_node, dlm->reco.new_master);
2780 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
2782 spin_lock(&dlm->spinlock);
2783 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
2784 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) {
2786 "to %u\n", dlm->name, dlm->reco.new_master,
2790 "to %u\n", dlm->name, dlm->reco.new_master,
2795 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
2797 "node %u changing it to %u\n", dlm->name,
2798 dlm->reco.dead_node, br->node_idx, br->dead_node);
2800 dlm_set_reco_master(dlm, br->node_idx);
2801 dlm_set_reco_dead_node(dlm, br->dead_node);
2802 if (!test_bit(br->dead_node, dlm->recovery_map)) {
2806 if (!test_bit(br->dead_node, dlm->domain_map) ||
2807 !test_bit(br->dead_node, dlm->live_nodes_map))
2813 set_bit(br->dead_node, dlm->domain_map);
2814 set_bit(br->dead_node, dlm->live_nodes_map);
2815 __dlm_hb_node_down(dlm, br->dead_node);
2817 spin_unlock(&dlm->spinlock);
2819 dlm_kick_recovery_thread(dlm);
2822 dlm->name, br->node_idx, br->dead_node,
2823 dlm->reco.dead_node, dlm->reco.new_master);
2825 dlm_put(dlm);
2830 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
2840 "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
2842 spin_lock(&dlm->spinlock);
2843 dlm_node_iter_init(dlm->domain_map, &iter);
2844 spin_unlock(&dlm->spinlock);
2848 fr.node_idx = dlm->node_num;
2849 fr.dead_node = dlm->reco.dead_node;
2854 if (nodenum == dlm->node_num)
2856 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
2863 dlm->key, nodenum);
2889 struct dlm_ctxt *dlm = data;
2894 if (!dlm_grab(dlm))
2901 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
2902 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
2904 spin_lock(&dlm->spinlock);
2906 if (dlm->reco.new_master != fr->node_idx) {
2909 fr->node_idx, dlm->reco.new_master, fr->dead_node);
2912 if (dlm->reco.dead_node != fr->dead_node) {
2915 fr->node_idx, fr->dead_node, dlm->reco.dead_node);
2921 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
2922 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
2926 dlm->name, fr->node_idx, fr->dead_node);
2927 dlm_print_reco_node_status(dlm);
2930 dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
2931 spin_unlock(&dlm->spinlock);
2934 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
2938 dlm->name, fr->node_idx, fr->dead_node);
2939 dlm_print_reco_node_status(dlm);
2942 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
2943 __dlm_reset_recovery(dlm);
2944 spin_unlock(&dlm->spinlock);
2945 dlm_kick_recovery_thread(dlm);
2950 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
2952 dlm_put(dlm);