1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * VMware vSockets Driver
4 *
5 * Copyright (C) 2009-2013 VMware, Inc. All rights reserved.
6 */
7
8#include <linux/types.h>
9#include <linux/socket.h>
10#include <linux/stddef.h>
11#include <net/sock.h>
12
13#include "vmci_transport_notify.h"
14
15#define PKT_FIELD(vsk, field_name) (vmci_trans(vsk)->notify.pkt.field_name)
16
17static bool vmci_transport_notify_waiting_write(struct vsock_sock *vsk)
18{
19#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
20	bool retval;
21	u64 notify_limit;
22
23	if (!PKT_FIELD(vsk, peer_waiting_write))
24		return false;
25
26#ifdef VSOCK_OPTIMIZATION_FLOW_CONTROL
27	/* When the sender blocks, we take that as a sign that the sender is
28	 * faster than the receiver. To reduce the transmit rate of the sender,
29	 * we delay the sending of the read notification by decreasing the
30	 * write_notify_window. The notification is delayed until the number of
31	 * bytes used in the queue drops below the write_notify_window.
32	 */
33
34	if (!PKT_FIELD(vsk, peer_waiting_write_detected)) {
35		PKT_FIELD(vsk, peer_waiting_write_detected) = true;
36		if (PKT_FIELD(vsk, write_notify_window) < PAGE_SIZE) {
37			PKT_FIELD(vsk, write_notify_window) =
38			    PKT_FIELD(vsk, write_notify_min_window);
39		} else {
40			PKT_FIELD(vsk, write_notify_window) -= PAGE_SIZE;
41			if (PKT_FIELD(vsk, write_notify_window) <
42			    PKT_FIELD(vsk, write_notify_min_window))
43				PKT_FIELD(vsk, write_notify_window) =
44				    PKT_FIELD(vsk, write_notify_min_window);
45
46		}
47	}
48	notify_limit = vmci_trans(vsk)->consume_size -
49		PKT_FIELD(vsk, write_notify_window);
50#else
51	notify_limit = 0;
52#endif
53
54	/* For now we ignore the wait information and just see if the free
55	 * space exceeds the notify limit.  Note that improving this function
56	 * to be more intelligent will not require a protocol change and will
57	 * retain compatibility between endpoints with mixed versions of this
58	 * function.
59	 *
60	 * The notify_limit is used to delay notifications in the case where
61	 * flow control is enabled. Below the test is expressed in terms of
62	 * free space in the queue: if free_space > ConsumeSize -
63	 * write_notify_window then notify An alternate way of expressing this
64	 * is to rewrite the expression to use the data ready in the receive
65	 * queue: if write_notify_window > bufferReady then notify as
66	 * free_space == ConsumeSize - bufferReady.
67	 */
68	retval = vmci_qpair_consume_free_space(vmci_trans(vsk)->qpair) >
69		notify_limit;
70#ifdef VSOCK_OPTIMIZATION_FLOW_CONTROL
71	if (retval) {
72		/*
73		 * Once we notify the peer, we reset the detected flag so the
74		 * next wait will again cause a decrease in the window size.
75		 */
76
77		PKT_FIELD(vsk, peer_waiting_write_detected) = false;
78	}
79#endif
80	return retval;
81#else
82	return true;
83#endif
84}
85
86static bool vmci_transport_notify_waiting_read(struct vsock_sock *vsk)
87{
88#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
89	if (!PKT_FIELD(vsk, peer_waiting_read))
90		return false;
91
92	/* For now we ignore the wait information and just see if there is any
93	 * data for our peer to read.  Note that improving this function to be
94	 * more intelligent will not require a protocol change and will retain
95	 * compatibility between endpoints with mixed versions of this
96	 * function.
97	 */
98	return vmci_qpair_produce_buf_ready(vmci_trans(vsk)->qpair) > 0;
99#else
100	return true;
101#endif
102}
103
104static void
105vmci_transport_handle_waiting_read(struct sock *sk,
106				   struct vmci_transport_packet *pkt,
107				   bool bottom_half,
108				   struct sockaddr_vm *dst,
109				   struct sockaddr_vm *src)
110{
111#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
112	struct vsock_sock *vsk;
113
114	vsk = vsock_sk(sk);
115
116	PKT_FIELD(vsk, peer_waiting_read) = true;
117	memcpy(&PKT_FIELD(vsk, peer_waiting_read_info), &pkt->u.wait,
118	       sizeof(PKT_FIELD(vsk, peer_waiting_read_info)));
119
120	if (vmci_transport_notify_waiting_read(vsk)) {
121		bool sent;
122
123		if (bottom_half)
124			sent = vmci_transport_send_wrote_bh(dst, src) > 0;
125		else
126			sent = vmci_transport_send_wrote(sk) > 0;
127
128		if (sent)
129			PKT_FIELD(vsk, peer_waiting_read) = false;
130	}
131#endif
132}
133
134static void
135vmci_transport_handle_waiting_write(struct sock *sk,
136				    struct vmci_transport_packet *pkt,
137				    bool bottom_half,
138				    struct sockaddr_vm *dst,
139				    struct sockaddr_vm *src)
140{
141#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
142	struct vsock_sock *vsk;
143
144	vsk = vsock_sk(sk);
145
146	PKT_FIELD(vsk, peer_waiting_write) = true;
147	memcpy(&PKT_FIELD(vsk, peer_waiting_write_info), &pkt->u.wait,
148	       sizeof(PKT_FIELD(vsk, peer_waiting_write_info)));
149
150	if (vmci_transport_notify_waiting_write(vsk)) {
151		bool sent;
152
153		if (bottom_half)
154			sent = vmci_transport_send_read_bh(dst, src) > 0;
155		else
156			sent = vmci_transport_send_read(sk) > 0;
157
158		if (sent)
159			PKT_FIELD(vsk, peer_waiting_write) = false;
160	}
161#endif
162}
163
164static void
165vmci_transport_handle_read(struct sock *sk,
166			   struct vmci_transport_packet *pkt,
167			   bool bottom_half,
168			   struct sockaddr_vm *dst, struct sockaddr_vm *src)
169{
170#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
171	struct vsock_sock *vsk;
172
173	vsk = vsock_sk(sk);
174	PKT_FIELD(vsk, sent_waiting_write) = false;
175#endif
176
177	sk->sk_write_space(sk);
178}
179
180static bool send_waiting_read(struct sock *sk, u64 room_needed)
181{
182#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
183	struct vsock_sock *vsk;
184	struct vmci_transport_waiting_info waiting_info;
185	u64 tail;
186	u64 head;
187	u64 room_left;
188	bool ret;
189
190	vsk = vsock_sk(sk);
191
192	if (PKT_FIELD(vsk, sent_waiting_read))
193		return true;
194
195	if (PKT_FIELD(vsk, write_notify_window) <
196			vmci_trans(vsk)->consume_size)
197		PKT_FIELD(vsk, write_notify_window) =
198		    min(PKT_FIELD(vsk, write_notify_window) + PAGE_SIZE,
199			vmci_trans(vsk)->consume_size);
200
201	vmci_qpair_get_consume_indexes(vmci_trans(vsk)->qpair, &tail, &head);
202	room_left = vmci_trans(vsk)->consume_size - head;
203	if (room_needed >= room_left) {
204		waiting_info.offset = room_needed - room_left;
205		waiting_info.generation =
206		    PKT_FIELD(vsk, consume_q_generation) + 1;
207	} else {
208		waiting_info.offset = head + room_needed;
209		waiting_info.generation = PKT_FIELD(vsk, consume_q_generation);
210	}
211
212	ret = vmci_transport_send_waiting_read(sk, &waiting_info) > 0;
213	if (ret)
214		PKT_FIELD(vsk, sent_waiting_read) = true;
215
216	return ret;
217#else
218	return true;
219#endif
220}
221
222static bool send_waiting_write(struct sock *sk, u64 room_needed)
223{
224#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
225	struct vsock_sock *vsk;
226	struct vmci_transport_waiting_info waiting_info;
227	u64 tail;
228	u64 head;
229	u64 room_left;
230	bool ret;
231
232	vsk = vsock_sk(sk);
233
234	if (PKT_FIELD(vsk, sent_waiting_write))
235		return true;
236
237	vmci_qpair_get_produce_indexes(vmci_trans(vsk)->qpair, &tail, &head);
238	room_left = vmci_trans(vsk)->produce_size - tail;
239	if (room_needed + 1 >= room_left) {
240		/* Wraps around to current generation. */
241		waiting_info.offset = room_needed + 1 - room_left;
242		waiting_info.generation = PKT_FIELD(vsk, produce_q_generation);
243	} else {
244		waiting_info.offset = tail + room_needed + 1;
245		waiting_info.generation =
246		    PKT_FIELD(vsk, produce_q_generation) - 1;
247	}
248
249	ret = vmci_transport_send_waiting_write(sk, &waiting_info) > 0;
250	if (ret)
251		PKT_FIELD(vsk, sent_waiting_write) = true;
252
253	return ret;
254#else
255	return true;
256#endif
257}
258
259static int vmci_transport_send_read_notification(struct sock *sk)
260{
261	struct vsock_sock *vsk;
262	bool sent_read;
263	unsigned int retries;
264	int err;
265
266	vsk = vsock_sk(sk);
267	sent_read = false;
268	retries = 0;
269	err = 0;
270
271	if (vmci_transport_notify_waiting_write(vsk)) {
272		/* Notify the peer that we have read, retrying the send on
273		 * failure up to our maximum value.  XXX For now we just log
274		 * the failure, but later we should schedule a work item to
275		 * handle the resend until it succeeds.  That would require
276		 * keeping track of work items in the vsk and cleaning them up
277		 * upon socket close.
278		 */
279		while (!(vsk->peer_shutdown & RCV_SHUTDOWN) &&
280		       !sent_read &&
281		       retries < VMCI_TRANSPORT_MAX_DGRAM_RESENDS) {
282			err = vmci_transport_send_read(sk);
283			if (err >= 0)
284				sent_read = true;
285
286			retries++;
287		}
288
289		if (retries >= VMCI_TRANSPORT_MAX_DGRAM_RESENDS)
290			pr_err("%p unable to send read notify to peer\n", sk);
291		else
292#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
293			PKT_FIELD(vsk, peer_waiting_write) = false;
294#endif
295
296	}
297	return err;
298}
299
300static void
301vmci_transport_handle_wrote(struct sock *sk,
302			    struct vmci_transport_packet *pkt,
303			    bool bottom_half,
304			    struct sockaddr_vm *dst, struct sockaddr_vm *src)
305{
306#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
307	struct vsock_sock *vsk = vsock_sk(sk);
308	PKT_FIELD(vsk, sent_waiting_read) = false;
309#endif
310	sk->sk_data_ready(sk);
311}
312
313static void vmci_transport_notify_pkt_socket_init(struct sock *sk)
314{
315	struct vsock_sock *vsk = vsock_sk(sk);
316
317	PKT_FIELD(vsk, write_notify_window) = PAGE_SIZE;
318	PKT_FIELD(vsk, write_notify_min_window) = PAGE_SIZE;
319	PKT_FIELD(vsk, peer_waiting_read) = false;
320	PKT_FIELD(vsk, peer_waiting_write) = false;
321	PKT_FIELD(vsk, peer_waiting_write_detected) = false;
322	PKT_FIELD(vsk, sent_waiting_read) = false;
323	PKT_FIELD(vsk, sent_waiting_write) = false;
324	PKT_FIELD(vsk, produce_q_generation) = 0;
325	PKT_FIELD(vsk, consume_q_generation) = 0;
326
327	memset(&PKT_FIELD(vsk, peer_waiting_read_info), 0,
328	       sizeof(PKT_FIELD(vsk, peer_waiting_read_info)));
329	memset(&PKT_FIELD(vsk, peer_waiting_write_info), 0,
330	       sizeof(PKT_FIELD(vsk, peer_waiting_write_info)));
331}
332
333static void vmci_transport_notify_pkt_socket_destruct(struct vsock_sock *vsk)
334{
335}
336
337static int
338vmci_transport_notify_pkt_poll_in(struct sock *sk,
339				  size_t target, bool *data_ready_now)
340{
341	struct vsock_sock *vsk = vsock_sk(sk);
342
343	if (vsock_stream_has_data(vsk)) {
344		*data_ready_now = true;
345	} else {
346		/* We can't read right now because there is nothing in the
347		 * queue. Ask for notifications when there is something to
348		 * read.
349		 */
350		if (sk->sk_state == TCP_ESTABLISHED) {
351			if (!send_waiting_read(sk, 1))
352				return -1;
353
354		}
355		*data_ready_now = false;
356	}
357
358	return 0;
359}
360
361static int
362vmci_transport_notify_pkt_poll_out(struct sock *sk,
363				   size_t target, bool *space_avail_now)
364{
365	s64 produce_q_free_space;
366	struct vsock_sock *vsk = vsock_sk(sk);
367
368	produce_q_free_space = vsock_stream_has_space(vsk);
369	if (produce_q_free_space > 0) {
370		*space_avail_now = true;
371		return 0;
372	} else if (produce_q_free_space == 0) {
373		/* This is a connected socket but we can't currently send data.
374		 * Notify the peer that we are waiting if the queue is full. We
375		 * only send a waiting write if the queue is full because
376		 * otherwise we end up in an infinite WAITING_WRITE, READ,
377		 * WAITING_WRITE, READ, etc. loop. Treat failing to send the
378		 * notification as a socket error, passing that back through
379		 * the mask.
380		 */
381		if (!send_waiting_write(sk, 1))
382			return -1;
383
384		*space_avail_now = false;
385	}
386
387	return 0;
388}
389
390static int
391vmci_transport_notify_pkt_recv_init(
392			struct sock *sk,
393			size_t target,
394			struct vmci_transport_recv_notify_data *data)
395{
396	struct vsock_sock *vsk = vsock_sk(sk);
397
398#ifdef VSOCK_OPTIMIZATION_WAITING_NOTIFY
399	data->consume_head = 0;
400	data->produce_tail = 0;
401#ifdef VSOCK_OPTIMIZATION_FLOW_CONTROL
402	data->notify_on_block = false;
403
404	if (PKT_FIELD(vsk, write_notify_min_window) < target + 1) {
405		PKT_FIELD(vsk, write_notify_min_window) = target + 1;
406		if (PKT_FIELD(vsk, write_notify_window) <
407		    PKT_FIELD(vsk, write_notify_min_window)) {
408			/* If the current window is smaller than the new
409			 * minimal window size, we need to reevaluate whether
410			 * we need to notify the sender. If the number of ready
411			 * bytes are smaller than the new window, we need to
412			 * send a notification to the sender before we block.
413			 */
414
415			PKT_FIELD(vsk, write_notify_window) =
416			    PKT_FIELD(vsk, write_notify_min_window);
417			data->notify_on_block = true;
418		}
419	}
420#endif
421#endif
422
423	return 0;
424}
425
426static int
427vmci_transport_notify_pkt_recv_pre_block(
428				struct sock *sk,
429				size_t target,
430				struct vmci_transport_recv_notify_data *data)
431{
432	int err = 0;
433
434	/* Notify our peer that we are waiting for data to read. */
435	if (!send_waiting_read(sk, target)) {
436		err = -EHOSTUNREACH;
437		return err;
438	}
439#ifdef VSOCK_OPTIMIZATION_FLOW_CONTROL
440	if (data->notify_on_block) {
441		err = vmci_transport_send_read_notification(sk);
442		if (err < 0)
443			return err;
444
445		data->notify_on_block = false;
446	}
447#endif
448
449	return err;
450}
451
452static int
453vmci_transport_notify_pkt_recv_pre_dequeue(
454				struct sock *sk,
455				size_t target,
456				struct vmci_transport_recv_notify_data *data)
457{
458	struct vsock_sock *vsk = vsock_sk(sk);
459
460	/* Now consume up to len bytes from the queue.  Note that since we have
461	 * the socket locked we should copy at least ready bytes.
462	 */
463#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
464	vmci_qpair_get_consume_indexes(vmci_trans(vsk)->qpair,
465				       &data->produce_tail,
466				       &data->consume_head);
467#endif
468
469	return 0;
470}
471
472static int
473vmci_transport_notify_pkt_recv_post_dequeue(
474				struct sock *sk,
475				size_t target,
476				ssize_t copied,
477				bool data_read,
478				struct vmci_transport_recv_notify_data *data)
479{
480	struct vsock_sock *vsk;
481	int err;
482
483	vsk = vsock_sk(sk);
484	err = 0;
485
486	if (data_read) {
487#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
488		/* Detect a wrap-around to maintain queue generation.  Note
489		 * that this is safe since we hold the socket lock across the
490		 * two queue pair operations.
491		 */
492		if (copied >=
493			vmci_trans(vsk)->consume_size - data->consume_head)
494			PKT_FIELD(vsk, consume_q_generation)++;
495#endif
496
497		err = vmci_transport_send_read_notification(sk);
498		if (err < 0)
499			return err;
500
501	}
502	return err;
503}
504
505static int
506vmci_transport_notify_pkt_send_init(
507			struct sock *sk,
508			struct vmci_transport_send_notify_data *data)
509{
510#ifdef VSOCK_OPTIMIZATION_WAITING_NOTIFY
511	data->consume_head = 0;
512	data->produce_tail = 0;
513#endif
514
515	return 0;
516}
517
518static int
519vmci_transport_notify_pkt_send_pre_block(
520				struct sock *sk,
521				struct vmci_transport_send_notify_data *data)
522{
523	/* Notify our peer that we are waiting for room to write. */
524	if (!send_waiting_write(sk, 1))
525		return -EHOSTUNREACH;
526
527	return 0;
528}
529
530static int
531vmci_transport_notify_pkt_send_pre_enqueue(
532				struct sock *sk,
533				struct vmci_transport_send_notify_data *data)
534{
535	struct vsock_sock *vsk = vsock_sk(sk);
536
537#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
538	vmci_qpair_get_produce_indexes(vmci_trans(vsk)->qpair,
539				       &data->produce_tail,
540				       &data->consume_head);
541#endif
542
543	return 0;
544}
545
546static int
547vmci_transport_notify_pkt_send_post_enqueue(
548				struct sock *sk,
549				ssize_t written,
550				struct vmci_transport_send_notify_data *data)
551{
552	int err = 0;
553	struct vsock_sock *vsk;
554	bool sent_wrote = false;
555	int retries = 0;
556
557	vsk = vsock_sk(sk);
558
559#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
560	/* Detect a wrap-around to maintain queue generation.  Note that this
561	 * is safe since we hold the socket lock across the two queue pair
562	 * operations.
563	 */
564	if (written >= vmci_trans(vsk)->produce_size - data->produce_tail)
565		PKT_FIELD(vsk, produce_q_generation)++;
566
567#endif
568
569	if (vmci_transport_notify_waiting_read(vsk)) {
570		/* Notify the peer that we have written, retrying the send on
571		 * failure up to our maximum value. See the XXX comment for the
572		 * corresponding piece of code in StreamRecvmsg() for potential
573		 * improvements.
574		 */
575		while (!(vsk->peer_shutdown & RCV_SHUTDOWN) &&
576		       !sent_wrote &&
577		       retries < VMCI_TRANSPORT_MAX_DGRAM_RESENDS) {
578			err = vmci_transport_send_wrote(sk);
579			if (err >= 0)
580				sent_wrote = true;
581
582			retries++;
583		}
584
585		if (retries >= VMCI_TRANSPORT_MAX_DGRAM_RESENDS) {
586			pr_err("%p unable to send wrote notify to peer\n", sk);
587			return err;
588		} else {
589#if defined(VSOCK_OPTIMIZATION_WAITING_NOTIFY)
590			PKT_FIELD(vsk, peer_waiting_read) = false;
591#endif
592		}
593	}
594	return err;
595}
596
597static void
598vmci_transport_notify_pkt_handle_pkt(
599			struct sock *sk,
600			struct vmci_transport_packet *pkt,
601			bool bottom_half,
602			struct sockaddr_vm *dst,
603			struct sockaddr_vm *src, bool *pkt_processed)
604{
605	bool processed = false;
606
607	switch (pkt->type) {
608	case VMCI_TRANSPORT_PACKET_TYPE_WROTE:
609		vmci_transport_handle_wrote(sk, pkt, bottom_half, dst, src);
610		processed = true;
611		break;
612	case VMCI_TRANSPORT_PACKET_TYPE_READ:
613		vmci_transport_handle_read(sk, pkt, bottom_half, dst, src);
614		processed = true;
615		break;
616	case VMCI_TRANSPORT_PACKET_TYPE_WAITING_WRITE:
617		vmci_transport_handle_waiting_write(sk, pkt, bottom_half,
618						    dst, src);
619		processed = true;
620		break;
621
622	case VMCI_TRANSPORT_PACKET_TYPE_WAITING_READ:
623		vmci_transport_handle_waiting_read(sk, pkt, bottom_half,
624						   dst, src);
625		processed = true;
626		break;
627	}
628
629	if (pkt_processed)
630		*pkt_processed = processed;
631}
632
633static void vmci_transport_notify_pkt_process_request(struct sock *sk)
634{
635	struct vsock_sock *vsk = vsock_sk(sk);
636
637	PKT_FIELD(vsk, write_notify_window) = vmci_trans(vsk)->consume_size;
638	if (vmci_trans(vsk)->consume_size <
639		PKT_FIELD(vsk, write_notify_min_window))
640		PKT_FIELD(vsk, write_notify_min_window) =
641			vmci_trans(vsk)->consume_size;
642}
643
644static void vmci_transport_notify_pkt_process_negotiate(struct sock *sk)
645{
646	struct vsock_sock *vsk = vsock_sk(sk);
647
648	PKT_FIELD(vsk, write_notify_window) = vmci_trans(vsk)->consume_size;
649	if (vmci_trans(vsk)->consume_size <
650		PKT_FIELD(vsk, write_notify_min_window))
651		PKT_FIELD(vsk, write_notify_min_window) =
652			vmci_trans(vsk)->consume_size;
653}
654
655/* Socket control packet based operations. */
656const struct vmci_transport_notify_ops vmci_transport_notify_pkt_ops = {
657	.socket_init = vmci_transport_notify_pkt_socket_init,
658	.socket_destruct = vmci_transport_notify_pkt_socket_destruct,
659	.poll_in = vmci_transport_notify_pkt_poll_in,
660	.poll_out = vmci_transport_notify_pkt_poll_out,
661	.handle_notify_pkt = vmci_transport_notify_pkt_handle_pkt,
662	.recv_init = vmci_transport_notify_pkt_recv_init,
663	.recv_pre_block = vmci_transport_notify_pkt_recv_pre_block,
664	.recv_pre_dequeue = vmci_transport_notify_pkt_recv_pre_dequeue,
665	.recv_post_dequeue = vmci_transport_notify_pkt_recv_post_dequeue,
666	.send_init = vmci_transport_notify_pkt_send_init,
667	.send_pre_block = vmci_transport_notify_pkt_send_pre_block,
668	.send_pre_enqueue = vmci_transport_notify_pkt_send_pre_enqueue,
669	.send_post_enqueue = vmci_transport_notify_pkt_send_post_enqueue,
670	.process_request = vmci_transport_notify_pkt_process_request,
671	.process_negotiate = vmci_transport_notify_pkt_process_negotiate,
672};
673