1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (!list_empty(&call->recvmsg_link))
31 		return;
32 
33 	rcu_read_lock();
34 
35 	rx = rcu_dereference(call->socket);
36 	sk = &rx->sk;
37 	if (rx && sk->sk_state < RXRPC_CLOSE) {
38 		if (call->notify_rx) {
39 			spin_lock_irq(&call->notify_lock);
40 			call->notify_rx(sk, call, call->user_call_ID);
41 			spin_unlock_irq(&call->notify_lock);
42 		} else {
43 			spin_lock_irq(&rx->recvmsg_lock);
44 			if (list_empty(&call->recvmsg_link)) {
45 				rxrpc_get_call(call, rxrpc_call_get_notify_socket);
46 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47 			}
48 			spin_unlock_irq(&rx->recvmsg_lock);
49 
50 			if (!sock_flag(sk, SOCK_DEAD)) {
51 				_debug("call %ps", sk->sk_data_ready);
52 				sk->sk_data_ready(sk);
53 			}
54 		}
55 	}
56 
57 	rcu_read_unlock();
58 	_leave("");
59 }
60 
61 /*
62  * Pass a call terminating message to userspace.
63  */
64 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
65 {
66 	u32 tmp = 0;
67 	int ret;
68 
69 	switch (call->completion) {
70 	case RXRPC_CALL_SUCCEEDED:
71 		ret = 0;
72 		if (rxrpc_is_service_call(call))
73 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
74 		break;
75 	case RXRPC_CALL_REMOTELY_ABORTED:
76 		tmp = call->abort_code;
77 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
78 		break;
79 	case RXRPC_CALL_LOCALLY_ABORTED:
80 		tmp = call->abort_code;
81 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82 		break;
83 	case RXRPC_CALL_NETWORK_ERROR:
84 		tmp = -call->error;
85 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
86 		break;
87 	case RXRPC_CALL_LOCAL_ERROR:
88 		tmp = -call->error;
89 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
90 		break;
91 	default:
92 		pr_err("Invalid terminal call state %u\n", call->completion);
93 		BUG();
94 		break;
95 	}
96 
97 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
98 			     call->ackr_window - 1,
99 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
100 	return ret;
101 }
102 
103 /*
104  * Discard a packet we've used up and advance the Rx window by one.
105  */
106 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
107 {
108 	struct rxrpc_skb_priv *sp;
109 	struct sk_buff *skb;
110 	rxrpc_serial_t serial;
111 	rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
112 	bool last;
113 	int acked;
114 
115 	_enter("%d", call->debug_id);
116 
117 	skb = skb_dequeue(&call->recvmsg_queue);
118 	rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
119 
120 	sp = rxrpc_skb(skb);
121 	tseq   = sp->hdr.seq;
122 	serial = sp->hdr.serial;
123 	last   = sp->hdr.flags & RXRPC_LAST_PACKET;
124 
125 	/* Barrier against rxrpc_input_data(). */
126 	if (after(tseq, call->rx_consumed))
127 		smp_store_release(&call->rx_consumed, tseq);
128 
129 	rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
130 
131 	trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
132 			    serial, call->rx_consumed);
133 
134 	if (last)
135 		set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
136 
137 	/* Check to see if there's an ACK that needs sending. */
138 	acked = atomic_add_return(call->rx_consumed - old_consumed,
139 				  &call->ackr_nr_consumed);
140 	if (acked > 8 &&
141 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
142 		rxrpc_poke_call(call, rxrpc_call_poke_idle);
143 }
144 
145 /*
146  * Decrypt and verify a DATA packet.
147  */
148 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
149 {
150 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
151 
152 	if (sp->flags & RXRPC_RX_VERIFIED)
153 		return 0;
154 	return call->security->verify_packet(call, skb);
155 }
156 
157 /*
158  * Transcribe a call's user ID to a control message.
159  */
160 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg,
161 				 int flags)
162 {
163 	if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags))
164 		return 0;
165 
166 	if (flags & MSG_CMSG_COMPAT) {
167 		unsigned int id32 = call->user_call_ID;
168 
169 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
170 				sizeof(unsigned int), &id32);
171 	} else {
172 		unsigned long idl = call->user_call_ID;
173 
174 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
175 				sizeof(unsigned long), &idl);
176 	}
177 }
178 
179 /*
180  * Deal with a CHALLENGE packet.
181  */
182 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg,
183 				   struct sk_buff *challenge, unsigned int flags)
184 {
185 	struct rxrpc_skb_priv *sp = rxrpc_skb(challenge);
186 	struct rxrpc_connection *conn = sp->chall.conn;
187 
188 	return conn->security->challenge_to_recvmsg(conn, challenge, msg);
189 }
190 
191 /*
192  * Process OOB packets.  Called with the socket locked.
193  */
194 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg,
195 			     unsigned int flags)
196 {
197 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
198 	struct sk_buff *skb;
199 	bool need_response = false;
200 	int ret;
201 
202 	skb = skb_peek(&rx->recvmsg_oobq);
203 	if (!skb)
204 		return -EAGAIN;
205 	rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
206 
207 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
208 		       &skb->skb_mstamp_ns);
209 	if (ret < 0)
210 		return ret;
211 
212 	switch ((enum rxrpc_oob_type)skb->mark) {
213 	case RXRPC_OOB_CHALLENGE:
214 		need_response = true;
215 		ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags);
216 		break;
217 	default:
218 		WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
219 			  skb->mark);
220 		ret = -EIO;
221 		break;
222 	}
223 
224 	if (!(flags & MSG_PEEK))
225 		skb_unlink(skb, &rx->recvmsg_oobq);
226 	if (need_response)
227 		rxrpc_add_pending_oob(rx, skb);
228 	else
229 		rxrpc_free_skb(skb, rxrpc_skb_put_oob);
230 	return ret;
231 }
232 
233 /*
234  * Deliver messages to a call.  This keeps processing packets until the buffer
235  * is filled and we find either more DATA (returns 0) or the end of the DATA
236  * (returns 1).  If more packets are required, it returns -EAGAIN and if the
237  * call has failed it returns -EIO.
238  */
239 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
240 			      struct msghdr *msg, struct iov_iter *iter,
241 			      size_t len, int flags, size_t *_offset)
242 {
243 	struct rxrpc_skb_priv *sp;
244 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
245 	struct sk_buff *skb;
246 	rxrpc_seq_t seq = 0;
247 	size_t remain;
248 	unsigned int rx_pkt_offset, rx_pkt_len;
249 	int copy, ret = -EAGAIN, ret2;
250 
251 	rx_pkt_offset = call->rx_pkt_offset;
252 	rx_pkt_len = call->rx_pkt_len;
253 
254 	if (rxrpc_call_has_failed(call)) {
255 		seq = call->ackr_window - 1;
256 		ret = -EIO;
257 		goto done;
258 	}
259 
260 	if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
261 		seq = call->ackr_window - 1;
262 		ret = 1;
263 		goto done;
264 	}
265 
266 	/* No one else can be removing stuff from the queue, so we shouldn't
267 	 * need the Rx lock to walk it.
268 	 */
269 	skb = skb_peek(&call->recvmsg_queue);
270 	while (skb) {
271 		rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
272 		sp = rxrpc_skb(skb);
273 		seq = sp->hdr.seq;
274 
275 		if (!(flags & MSG_PEEK))
276 			trace_rxrpc_receive(call, rxrpc_receive_front,
277 					    sp->hdr.serial, seq);
278 
279 		if (msg)
280 			sock_recv_timestamp(msg, sock->sk, skb);
281 
282 		if (rx_pkt_offset == 0) {
283 			ret2 = rxrpc_verify_data(call, skb);
284 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
285 					     sp->offset, sp->len, ret2);
286 			if (ret2 < 0) {
287 				ret = ret2;
288 				goto out;
289 			}
290 			rx_pkt_offset = sp->offset;
291 			rx_pkt_len = sp->len;
292 		} else {
293 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
294 					     rx_pkt_offset, rx_pkt_len, 0);
295 		}
296 
297 		/* We have to handle short, empty and used-up DATA packets. */
298 		remain = len - *_offset;
299 		copy = rx_pkt_len;
300 		if (copy > remain)
301 			copy = remain;
302 		if (copy > 0) {
303 			ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
304 						      copy);
305 			if (ret2 < 0) {
306 				ret = ret2;
307 				goto out;
308 			}
309 
310 			/* handle piecemeal consumption of data packets */
311 			rx_pkt_offset += copy;
312 			rx_pkt_len -= copy;
313 			*_offset += copy;
314 		}
315 
316 		if (rx_pkt_len > 0) {
317 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
318 					     rx_pkt_offset, rx_pkt_len, 0);
319 			ASSERTCMP(*_offset, ==, len);
320 			ret = 0;
321 			break;
322 		}
323 
324 		/* The whole packet has been transferred. */
325 		if (sp->hdr.flags & RXRPC_LAST_PACKET)
326 			ret = 1;
327 		rx_pkt_offset = 0;
328 		rx_pkt_len = 0;
329 
330 		skb = skb_peek_next(skb, &call->recvmsg_queue);
331 
332 		if (!(flags & MSG_PEEK))
333 			rxrpc_rotate_rx_window(call);
334 
335 		if (!rx->app_ops &&
336 		    !skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
337 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq,
338 					     rx_pkt_offset, rx_pkt_len, ret);
339 			break;
340 		}
341 	}
342 
343 out:
344 	if (!(flags & MSG_PEEK)) {
345 		call->rx_pkt_offset = rx_pkt_offset;
346 		call->rx_pkt_len = rx_pkt_len;
347 	}
348 
349 done:
350 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
351 			     rx_pkt_offset, rx_pkt_len, ret);
352 	if (ret == -EAGAIN)
353 		set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
354 	return ret;
355 }
356 
357 /*
358  * Receive a message from an RxRPC socket
359  * - we need to be careful about two or more threads calling recvmsg
360  *   simultaneously
361  */
362 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
363 		  int flags)
364 {
365 	struct rxrpc_call *call;
366 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
367 	struct list_head *l;
368 	unsigned int call_debug_id = 0;
369 	size_t copied = 0;
370 	long timeo;
371 	int ret;
372 
373 	DEFINE_WAIT(wait);
374 
375 	trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
376 
377 	if (flags & (MSG_OOB | MSG_TRUNC))
378 		return -EOPNOTSUPP;
379 
380 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
381 
382 try_again:
383 	lock_sock(&rx->sk);
384 
385 	/* Return immediately if a client socket has no outstanding calls */
386 	if (RB_EMPTY_ROOT(&rx->calls) &&
387 	    list_empty(&rx->recvmsg_q) &&
388 	    skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
389 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
390 		release_sock(&rx->sk);
391 		return -EAGAIN;
392 	}
393 
394 	if (list_empty(&rx->recvmsg_q)) {
395 		ret = -EWOULDBLOCK;
396 		if (timeo == 0) {
397 			call = NULL;
398 			goto error_no_call;
399 		}
400 
401 		release_sock(&rx->sk);
402 
403 		/* Wait for something to happen */
404 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
405 					  TASK_INTERRUPTIBLE);
406 		ret = sock_error(&rx->sk);
407 		if (ret)
408 			goto wait_error;
409 
410 		if (list_empty(&rx->recvmsg_q) &&
411 		    skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
412 			if (signal_pending(current))
413 				goto wait_interrupted;
414 			trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
415 			timeo = schedule_timeout(timeo);
416 		}
417 		finish_wait(sk_sleep(&rx->sk), &wait);
418 		goto try_again;
419 	}
420 
421 	/* Deal with OOB messages before we consider getting normal data. */
422 	if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
423 		ret = rxrpc_recvmsg_oob(sock, msg, flags);
424 		release_sock(&rx->sk);
425 		if (ret == -EAGAIN)
426 			goto try_again;
427 		goto error_no_call;
428 	}
429 
430 	/* Find the next call and dequeue it if we're not just peeking.  If we
431 	 * do dequeue it, that comes with a ref that we will need to release.
432 	 * We also want to weed out calls that got requeued whilst we were
433 	 * shovelling data out.
434 	 */
435 	spin_lock_irq(&rx->recvmsg_lock);
436 	l = rx->recvmsg_q.next;
437 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
438 
439 	if (!rxrpc_call_is_complete(call) &&
440 	    skb_queue_empty(&call->recvmsg_queue) &&
441 	    skb_queue_empty(&rx->recvmsg_oobq)) {
442 		list_del_init(&call->recvmsg_link);
443 		spin_unlock_irq(&rx->recvmsg_lock);
444 		release_sock(&rx->sk);
445 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
446 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
447 		goto try_again;
448 	}
449 
450 	if (!(flags & MSG_PEEK))
451 		list_del_init(&call->recvmsg_link);
452 	else
453 		rxrpc_get_call(call, rxrpc_call_get_recvmsg);
454 	spin_unlock_irq(&rx->recvmsg_lock);
455 
456 	call_debug_id = call->debug_id;
457 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
458 
459 	/* We're going to drop the socket lock, so we need to lock the call
460 	 * against interference by sendmsg.
461 	 */
462 	if (!mutex_trylock(&call->user_mutex)) {
463 		ret = -EWOULDBLOCK;
464 		if (flags & MSG_DONTWAIT)
465 			goto error_requeue_call;
466 		ret = -ERESTARTSYS;
467 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
468 			goto error_requeue_call;
469 	}
470 
471 	release_sock(&rx->sk);
472 
473 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
474 		BUG();
475 
476 	ret = rxrpc_recvmsg_user_id(call, msg, flags);
477 	if (ret < 0)
478 		goto error_unlock_call;
479 
480 	if (msg->msg_name && call->peer) {
481 		size_t len = sizeof(call->dest_srx);
482 
483 		memcpy(msg->msg_name, &call->dest_srx, len);
484 		msg->msg_namelen = len;
485 	}
486 
487 	ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
488 				 flags, &copied);
489 	if (ret == -EAGAIN)
490 		ret = 0;
491 	if (ret == -EIO)
492 		goto call_failed;
493 	if (ret < 0)
494 		goto error_unlock_call;
495 
496 	if (rxrpc_call_is_complete(call) &&
497 	    skb_queue_empty(&call->recvmsg_queue))
498 		goto call_complete;
499 	if (rxrpc_call_has_failed(call))
500 		goto call_failed;
501 
502 	if (!skb_queue_empty(&call->recvmsg_queue))
503 		rxrpc_notify_socket(call);
504 	goto not_yet_complete;
505 
506 call_failed:
507 	rxrpc_purge_queue(&call->recvmsg_queue);
508 call_complete:
509 	ret = rxrpc_recvmsg_term(call, msg);
510 	if (ret < 0)
511 		goto error_unlock_call;
512 	if (!(flags & MSG_PEEK))
513 		rxrpc_release_call(rx, call);
514 	msg->msg_flags |= MSG_EOR;
515 	ret = 1;
516 
517 not_yet_complete:
518 	if (ret == 0)
519 		msg->msg_flags |= MSG_MORE;
520 	else
521 		msg->msg_flags &= ~MSG_MORE;
522 	ret = copied;
523 
524 error_unlock_call:
525 	mutex_unlock(&call->user_mutex);
526 	rxrpc_put_call(call, rxrpc_call_put_recvmsg);
527 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
528 	return ret;
529 
530 error_requeue_call:
531 	if (!(flags & MSG_PEEK)) {
532 		spin_lock_irq(&rx->recvmsg_lock);
533 		list_add(&call->recvmsg_link, &rx->recvmsg_q);
534 		spin_unlock_irq(&rx->recvmsg_lock);
535 		trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
536 	} else {
537 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
538 	}
539 error_no_call:
540 	release_sock(&rx->sk);
541 error_trace:
542 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
543 	return ret;
544 
545 wait_interrupted:
546 	ret = sock_intr_errno(timeo);
547 wait_error:
548 	finish_wait(sk_sleep(&rx->sk), &wait);
549 	call = NULL;
550 	goto error_trace;
551 }
552 
553 /**
554  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
555  * @sock: The socket that the call exists on
556  * @call: The call to send data through
557  * @iter: The buffer to receive into
558  * @_len: The amount of data we want to receive (decreased on return)
559  * @want_more: True if more data is expected to be read
560  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
561  * @_service: Where to store the actual service ID (may be upgraded)
562  *
563  * Allow a kernel service to receive data and pick up information about the
564  * state of a call.  Note that *@_abort should also be initialised to %0.
565  *
566  * Note that we may return %-EAGAIN to drain empty packets at the end
567  * of the data, even if we've already copied over the requested data.
568  *
569  * Return: %0 if got what was asked for and there's more available, %1
570  * if we got what was asked for and we're at the end of the data and
571  * %-EAGAIN if we need more data.
572  */
573 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
574 			   struct iov_iter *iter, size_t *_len,
575 			   bool want_more, u32 *_abort, u16 *_service)
576 {
577 	size_t offset = 0;
578 	int ret;
579 
580 	_enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
581 
582 	mutex_lock(&call->user_mutex);
583 
584 	ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
585 	*_len -= offset;
586 	if (ret == -EIO)
587 		goto call_failed;
588 	if (ret < 0)
589 		goto out;
590 
591 	/* We can only reach here with a partially full buffer if we have
592 	 * reached the end of the data.  We must otherwise have a full buffer
593 	 * or have been given -EAGAIN.
594 	 */
595 	if (ret == 1) {
596 		if (iov_iter_count(iter) > 0)
597 			goto short_data;
598 		if (!want_more)
599 			goto read_phase_complete;
600 		ret = 0;
601 		goto out;
602 	}
603 
604 	if (!want_more)
605 		goto excess_data;
606 	goto out;
607 
608 read_phase_complete:
609 	ret = 1;
610 out:
611 	if (_service)
612 		*_service = call->dest_srx.srx_service;
613 	mutex_unlock(&call->user_mutex);
614 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
615 	return ret;
616 
617 short_data:
618 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
619 			  call->cid, call->call_id, call->rx_consumed,
620 			  0, -EBADMSG);
621 	ret = -EBADMSG;
622 	goto out;
623 excess_data:
624 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
625 			  call->cid, call->call_id, call->rx_consumed,
626 			  0, -EMSGSIZE);
627 	ret = -EMSGSIZE;
628 	goto out;
629 call_failed:
630 	*_abort = call->abort_code;
631 	ret = call->error;
632 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
633 		ret = 1;
634 		if (iov_iter_count(iter) > 0)
635 			ret = -ECONNRESET;
636 	}
637 	goto out;
638 }
639 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
640