Lines Matching refs:node
29 * Due the fact that dlm has pre-configured node addresses on every side
35 * application layer. The version field of every node will be set on these RCOM
36 * messages as soon as they arrived and the node isn't yet part of the nodes
47 * compatibility. A node cannot send anything to another node when a DLM_FIN
51 * manager removed the node from internal lists, at this point DLM does not
52 * send any message to the other node. There exists two cases:
160 * We could send a fence signal for a specific node to the cluster
185 /* counts how many lockspaces are using this node
187 * node wants to disconnect.
200 struct midcomms_node *node;
206 void (*ack_rcv)(struct midcomms_node *node);
255 const char *dlm_midcomms_state(struct midcomms_node *node)
257 return dlm_state_str(node->state);
260 unsigned long dlm_midcomms_flags(struct midcomms_node *node)
262 return node->flags;
265 int dlm_midcomms_send_queue_cnt(struct midcomms_node *node)
267 return atomic_read(&node->send_queue_cnt);
270 uint32_t dlm_midcomms_version(struct midcomms_node *node)
272 return node->version;
277 struct midcomms_node *node;
279 hlist_for_each_entry_rcu(node, &node_hash[r], hlist) {
280 if (node->nodeid == nodeid)
281 return node;
295 static void dlm_mhandle_delete(struct midcomms_node *node,
299 atomic_dec(&node->send_queue_cnt);
303 static void dlm_send_queue_flush(struct midcomms_node *node)
307 pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
310 spin_lock_bh(&node->send_queue_lock);
311 list_for_each_entry_rcu(mh, &node->send_queue, list) {
312 dlm_mhandle_delete(node, mh);
314 spin_unlock_bh(&node->send_queue_lock);
318 static void midcomms_node_reset(struct midcomms_node *node)
320 pr_debug("reset node %d\n", node->nodeid);
322 atomic_set(&node->seq_next, DLM_SEQ_INIT);
323 atomic_set(&node->seq_send, DLM_SEQ_INIT);
324 atomic_set(&node->ulp_delivered, 0);
325 node->version = DLM_VERSION_NOT_SET;
326 node->flags = 0;
328 dlm_send_queue_flush(node);
329 node->state = DLM_CLOSED;
330 wake_up(&node->shutdown_wait);
341 struct midcomms_node *node;
348 node = __find_node(nodeid, r);
349 if (node) {
355 node = kmalloc(sizeof(*node), GFP_NOFS);
356 if (!node)
359 node->nodeid = nodeid;
360 spin_lock_init(&node->state_lock);
361 spin_lock_init(&node->send_queue_lock);
362 atomic_set(&node->send_queue_cnt, 0);
363 INIT_LIST_HEAD(&node->send_queue);
364 init_waitqueue_head(&node->shutdown_wait);
365 node->users = 0;
366 midcomms_node_reset(node);
369 hlist_add_head_rcu(&node->hlist, &node_hash[r]);
372 node->debugfs = dlm_create_debug_comms_file(nodeid, node);
402 static void dlm_send_ack_threshold(struct midcomms_node *node,
410 oval = atomic_read(&node->ulp_delivered);
418 } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
421 dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
424 static int dlm_send_fin(struct midcomms_node *node,
425 void (*ack_rcv)(struct midcomms_node *node))
432 mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc);
436 set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
446 pr_debug("sending fin msg to node %d\n", node->nodeid);
452 static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
457 list_for_each_entry_rcu(mh, &node->send_queue, list) {
460 mh->ack_rcv(node);
467 spin_lock_bh(&node->send_queue_lock);
468 list_for_each_entry_rcu(mh, &node->send_queue, list) {
470 dlm_mhandle_delete(node, mh);
476 spin_unlock_bh(&node->send_queue_lock);
480 static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
482 spin_lock(&node->state_lock);
483 pr_debug("receive passive fin ack from node %d with state %s\n",
484 node->nodeid, dlm_state_str(node->state));
486 switch (node->state) {
489 midcomms_node_reset(node);
493 wake_up(&node->shutdown_wait);
496 spin_unlock(&node->state_lock);
498 __func__, node->state);
502 spin_unlock(&node->state_lock);
521 struct midcomms_node *node,
528 oval = atomic_read(&node->seq_next);
534 } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
539 spin_lock(&node->state_lock);
540 pr_debug("receive fin msg from node %d with state %s\n",
541 node->nodeid, dlm_state_str(node->state));
543 switch (node->state) {
545 dlm_send_ack(node->nodeid, nval);
548 * additional we check if the node is used by
551 if (node->users == 0) {
552 node->state = DLM_LAST_ACK;
553 pr_debug("switch node %d to state %s case 1\n",
554 node->nodeid, dlm_state_str(node->state));
555 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
556 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
558 node->state = DLM_CLOSE_WAIT;
559 pr_debug("switch node %d to state %s\n",
560 node->nodeid, dlm_state_str(node->state));
564 dlm_send_ack(node->nodeid, nval);
565 node->state = DLM_CLOSING;
566 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
567 pr_debug("switch node %d to state %s\n",
568 node->nodeid, dlm_state_str(node->state));
571 dlm_send_ack(node->nodeid, nval);
572 midcomms_node_reset(node);
573 pr_debug("switch node %d to state %s\n",
574 node->nodeid, dlm_state_str(node->state));
580 spin_unlock(&node->state_lock);
582 __func__, node->state);
586 spin_unlock(&node->state_lock);
589 WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
591 dlm_receive_buffer(p, node->nodeid);
592 atomic_inc(&node->ulp_delivered);
594 dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
599 * current node->seq_next number as ack.
602 dlm_send_ack(node->nodeid, oval);
605 seq, oval, node->nodeid);
628 log_print("fin too small: %d, will skip this message from node %d",
636 log_print("msg too small: %d, will skip this message from node %d",
644 log_print("rcom msg too small: %d, will skip this message from node %d",
651 log_print("unsupported o_nextcmd received: %u, will skip this message from node %d",
662 struct midcomms_node *node;
667 node = nodeid2node(nodeid);
668 if (WARN_ON_ONCE(!node))
671 switch (node->version) {
673 node->version = DLM_VERSION_3_2;
674 wake_up(&node->shutdown_wait);
675 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
676 node->nodeid);
678 spin_lock(&node->state_lock);
679 switch (node->state) {
681 node->state = DLM_ESTABLISHED;
682 pr_debug("switch node %d to state %s\n",
683 node->nodeid, dlm_state_str(node->state));
688 spin_unlock(&node->state_lock);
694 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
695 DLM_VERSION_3_2, node->nodeid, node->version);
717 log_print("unsupported rcom type received: %u, will skip this message from node %d",
722 WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
730 log_print("opts msg too small: %u, will skip this message from node %d",
743 log_print("inner rcom msg too small: %u, will skip this message from node %d",
751 log_print("inner msg too small: %u, will skip this message from node %d",
759 log_print("inner fin too small: %u, will skip this message from node %d",
766 log_print("unsupported inner h_cmd received: %u, will skip this message from node %d",
771 dlm_midcomms_receive_buffer(p, node, seq);
775 dlm_receive_ack(node, seq);
778 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
790 struct midcomms_node *node;
794 node = nodeid2node(nodeid);
795 if (WARN_ON_ONCE(!node)) {
800 switch (node->version) {
802 node->version = DLM_VERSION_3_1;
803 wake_up(&node->shutdown_wait);
804 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
805 node->nodeid);
810 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
811 DLM_VERSION_3_1, node->nodeid, node->version);
823 log_print("msg too small: %u, will skip this message from node %d",
830 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
862 log_print("received invalid length header: %u from node %d, will abort message parsing",
907 log_print("received invalid version header: %u from node %d, will skip this message",
922 struct midcomms_node *node;
927 node = nodeid2node(nodeid);
928 if (WARN_ON_ONCE(!node)) {
934 switch (node->version) {
943 list_for_each_entry_rcu(mh, &node->send_queue, list) {
950 mh->seq, node->nodeid);
970 atomic_inc(&mh->node->send_queue_cnt);
972 spin_lock_bh(&mh->node->send_queue_lock);
973 list_add_tail_rcu(&mh->list, &mh->node->send_queue);
974 spin_unlock_bh(&mh->node->send_queue_lock);
976 mh->seq = atomic_fetch_inc(&mh->node->seq_send);
1008 struct midcomms_node *node;
1014 node = nodeid2node(nodeid);
1015 if (WARN_ON_ONCE(!node))
1019 WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
1028 mh->node = node;
1030 switch (node->version) {
1042 dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
1077 trace_dlm_send_message(mh->node->nodeid, mh->seq,
1082 trace_dlm_send_rcom(mh->node->nodeid, mh->seq,
1109 switch (mh->node->version) {
1159 struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
1161 WARN_ON_ONCE(atomic_read(&node->send_queue_cnt));
1162 dlm_send_queue_flush(node);
1163 kfree(node);
1168 struct midcomms_node *node;
1173 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1174 dlm_delete_debug_comms_file(node->debugfs);
1177 hlist_del_rcu(&node->hlist);
1180 call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1188 static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
1190 spin_lock(&node->state_lock);
1191 pr_debug("receive active fin ack from node %d with state %s\n",
1192 node->nodeid, dlm_state_str(node->state));
1194 switch (node->state) {
1196 node->state = DLM_FIN_WAIT2;
1197 pr_debug("switch node %d to state %s\n",
1198 node->nodeid, dlm_state_str(node->state));
1201 midcomms_node_reset(node);
1202 pr_debug("switch node %d to state %s\n",
1203 node->nodeid, dlm_state_str(node->state));
1207 wake_up(&node->shutdown_wait);
1210 spin_unlock(&node->state_lock);
1212 __func__, node->state);
1216 spin_unlock(&node->state_lock);
1221 struct midcomms_node *node;
1225 node = nodeid2node(nodeid);
1226 if (WARN_ON_ONCE(!node)) {
1231 spin_lock(&node->state_lock);
1232 if (!node->users) {
1233 pr_debug("receive add member from node %d with state %s\n",
1234 node->nodeid, dlm_state_str(node->state));
1235 switch (node->state) {
1239 node->state = DLM_ESTABLISHED;
1240 pr_debug("switch node %d to state %s\n",
1241 node->nodeid, dlm_state_str(node->state));
1248 log_print("reset node %d because shutdown stuck",
1249 node->nodeid);
1251 midcomms_node_reset(node);
1252 node->state = DLM_ESTABLISHED;
1257 node->users++;
1258 pr_debug("node %d users inc count %d\n", nodeid, node->users);
1259 spin_unlock(&node->state_lock);
1266 struct midcomms_node *node;
1270 node = nodeid2node(nodeid);
1271 /* in case of dlm_midcomms_close() removes node */
1272 if (!node) {
1277 spin_lock(&node->state_lock);
1278 /* case of dlm_midcomms_addr() created node but
1280 * removed the node
1282 if (!node->users) {
1283 spin_unlock(&node->state_lock);
1288 node->users--;
1289 pr_debug("node %d users dec count %d\n", nodeid, node->users);
1295 if (node->users == 0) {
1296 pr_debug("receive remove member from node %d with state %s\n",
1297 node->nodeid, dlm_state_str(node->state));
1298 switch (node->state) {
1303 node->state = DLM_LAST_ACK;
1304 pr_debug("switch node %d to state %s case 2\n",
1305 node->nodeid, dlm_state_str(node->state));
1306 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
1307 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
1317 __func__, node->state);
1321 spin_unlock(&node->state_lock);
1328 struct midcomms_node *node;
1333 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1334 ret = wait_event_timeout(node->shutdown_wait,
1335 node->version != DLM_VERSION_NOT_SET ||
1336 node->state == DLM_CLOSED ||
1337 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1339 if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags))
1340 pr_debug("version wait timed out for node %d with state %s\n",
1341 node->nodeid, dlm_state_str(node->state));
1347 static void midcomms_shutdown(struct midcomms_node *node)
1352 switch (node->version) {
1359 spin_lock(&node->state_lock);
1360 pr_debug("receive active shutdown for node %d with state %s\n",
1361 node->nodeid, dlm_state_str(node->state));
1362 switch (node->state) {
1364 node->state = DLM_FIN_WAIT1;
1365 pr_debug("switch node %d to state %s case 2\n",
1366 node->nodeid, dlm_state_str(node->state));
1367 dlm_send_fin(node, dlm_act_fin_ack_rcv);
1378 spin_unlock(&node->state_lock);
1384 ret = wait_event_timeout(node->shutdown_wait,
1385 node->state == DLM_CLOSED ||
1386 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1389 pr_debug("active shutdown timed out for node %d with state %s\n",
1390 node->nodeid, dlm_state_str(node->state));
1392 pr_debug("active shutdown done for node %d with state %s\n",
1393 node->nodeid, dlm_state_str(node->state));
1398 struct midcomms_node *node;
1404 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1405 midcomms_shutdown(node);
1412 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1413 midcomms_node_reset(node);
1422 struct midcomms_node *node;
1427 node = nodeid2node(nodeid);
1428 if (node) {
1430 set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
1431 wake_up(&node->shutdown_wait);
1439 node = nodeid2node(nodeid);
1440 if (!node) {
1447 dlm_delete_debug_comms_file(node->debugfs);
1450 hlist_del_rcu(&node->hlist);
1458 * this function get called when the node is fenced
1460 dlm_send_queue_flush(node);
1462 call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1470 struct midcomms_node *node;
1486 h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
1495 int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
1502 rd.node = node;
1505 msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,