1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* In-kernel rxperf server for testing purposes.
3  *
4  * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) "rxperf: " fmt
9 #include <linux/module.h>
10 #include <linux/slab.h>
11 #include <crypto/krb5.h>
12 #include <net/sock.h>
13 #include <net/af_rxrpc.h>
14 #define RXRPC_TRACE_ONLY_DEFINE_ENUMS
15 #include <trace/events/rxrpc.h>
16 
17 MODULE_DESCRIPTION("rxperf test server (afs)");
18 MODULE_AUTHOR("Red Hat, Inc.");
19 MODULE_LICENSE("GPL");
20 
21 #define RXPERF_PORT		7009
22 #define RX_PERF_SERVICE		147
23 #define RX_PERF_VERSION		3
24 #define RX_PERF_SEND		0
25 #define RX_PERF_RECV		1
26 #define RX_PERF_RPC		3
27 #define RX_PERF_FILE		4
28 #define RX_PERF_MAGIC_COOKIE	0x4711
29 
30 struct rxperf_proto_params {
31 	__be32		version;
32 	__be32		type;
33 	__be32		rsize;
34 	__be32		wsize;
35 } __packed;
36 
37 static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
38 static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
39 
40 enum rxperf_call_state {
41 	RXPERF_CALL_SV_AWAIT_PARAMS,	/* Server: Awaiting parameter block */
42 	RXPERF_CALL_SV_AWAIT_REQUEST,	/* Server: Awaiting request data */
43 	RXPERF_CALL_SV_REPLYING,	/* Server: Replying */
44 	RXPERF_CALL_SV_AWAIT_ACK,	/* Server: Awaiting final ACK */
45 	RXPERF_CALL_COMPLETE,		/* Completed or failed */
46 };
47 
48 struct rxperf_call {
49 	struct rxrpc_call	*rxcall;
50 	struct iov_iter		iter;
51 	struct kvec		kvec[1];
52 	struct work_struct	work;
53 	const char		*type;
54 	size_t			iov_len;
55 	size_t			req_len;	/* Size of request blob */
56 	size_t			reply_len;	/* Size of reply blob */
57 	unsigned int		debug_id;
58 	unsigned int		operation_id;
59 	struct rxperf_proto_params params;
60 	__be32			tmp[2];
61 	s32			abort_code;
62 	enum rxperf_call_state	state;
63 	short			error;
64 	unsigned short		unmarshal;
65 	u16			service_id;
66 	int (*deliver)(struct rxperf_call *call);
67 	void (*processor)(struct work_struct *work);
68 };
69 
70 static struct socket *rxperf_socket;
71 static struct key *rxperf_sec_keyring;	/* Ring of security/crypto keys */
72 static struct workqueue_struct *rxperf_workqueue;
73 
74 static void rxperf_deliver_to_call(struct work_struct *work);
75 static int rxperf_deliver_param_block(struct rxperf_call *call);
76 static int rxperf_deliver_request(struct rxperf_call *call);
77 static int rxperf_process_call(struct rxperf_call *call);
78 static void rxperf_charge_preallocation(struct work_struct *work);
79 
80 static DECLARE_WORK(rxperf_charge_preallocation_work,
81 		    rxperf_charge_preallocation);
82 
83 static inline void rxperf_set_call_state(struct rxperf_call *call,
84 					 enum rxperf_call_state to)
85 {
86 	call->state = to;
87 }
88 
89 static inline void rxperf_set_call_complete(struct rxperf_call *call,
90 					    int error, s32 remote_abort)
91 {
92 	if (call->state != RXPERF_CALL_COMPLETE) {
93 		call->abort_code = remote_abort;
94 		call->error = error;
95 		call->state = RXPERF_CALL_COMPLETE;
96 	}
97 }
98 
99 static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
100 				       unsigned long user_call_ID)
101 {
102 	kfree((struct rxperf_call *)user_call_ID);
103 }
104 
105 static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
106 			       unsigned long user_call_ID)
107 {
108 	queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
109 }
110 
111 static void rxperf_queue_call_work(struct rxperf_call *call)
112 {
113 	queue_work(rxperf_workqueue, &call->work);
114 }
115 
116 static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
117 			     unsigned long call_user_ID)
118 {
119 	struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
120 
121 	if (call->state != RXPERF_CALL_COMPLETE)
122 		rxperf_queue_call_work(call);
123 }
124 
125 static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
126 {
127 	struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
128 
129 	call->rxcall = rxcall;
130 }
131 
132 static void rxperf_notify_end_reply_tx(struct sock *sock,
133 				       struct rxrpc_call *rxcall,
134 				       unsigned long call_user_ID)
135 {
136 	rxperf_set_call_state((struct rxperf_call *)call_user_ID,
137 			      RXPERF_CALL_SV_AWAIT_ACK);
138 }
139 
140 static const struct rxrpc_kernel_ops rxperf_rxrpc_callback_ops = {
141 	.notify_new_call	= rxperf_rx_new_call,
142 	.discard_new_call	= rxperf_rx_discard_new_call,
143 	.user_attach_call	= rxperf_rx_attach,
144 };
145 
146 /*
147  * Charge the incoming call preallocation.
148  */
149 static void rxperf_charge_preallocation(struct work_struct *work)
150 {
151 	struct rxperf_call *call;
152 
153 	for (;;) {
154 		call = kzalloc(sizeof(*call), GFP_KERNEL);
155 		if (!call)
156 			break;
157 
158 		call->type		= "unset";
159 		call->debug_id		= atomic_inc_return(&rxrpc_debug_id);
160 		call->deliver		= rxperf_deliver_param_block;
161 		call->state		= RXPERF_CALL_SV_AWAIT_PARAMS;
162 		call->service_id	= RX_PERF_SERVICE;
163 		call->iov_len		= sizeof(call->params);
164 		call->kvec[0].iov_len	= sizeof(call->params);
165 		call->kvec[0].iov_base	= &call->params;
166 		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
167 		INIT_WORK(&call->work, rxperf_deliver_to_call);
168 
169 		if (rxrpc_kernel_charge_accept(rxperf_socket,
170 					       rxperf_notify_rx,
171 					       (unsigned long)call,
172 					       GFP_KERNEL,
173 					       call->debug_id) < 0)
174 			break;
175 		call = NULL;
176 	}
177 
178 	kfree(call);
179 }
180 
181 /*
182  * Open an rxrpc socket and bind it to be a server for callback notifications
183  * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
184  */
185 static int rxperf_open_socket(void)
186 {
187 	struct sockaddr_rxrpc srx;
188 	struct socket *socket;
189 	int ret;
190 
191 	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
192 			       &socket);
193 	if (ret < 0)
194 		goto error_1;
195 
196 	socket->sk->sk_allocation = GFP_NOFS;
197 
198 	/* bind the callback manager's address to make this a server socket */
199 	memset(&srx, 0, sizeof(srx));
200 	srx.srx_family			= AF_RXRPC;
201 	srx.srx_service			= RX_PERF_SERVICE;
202 	srx.transport_type		= SOCK_DGRAM;
203 	srx.transport_len		= sizeof(srx.transport.sin6);
204 	srx.transport.sin6.sin6_family	= AF_INET6;
205 	srx.transport.sin6.sin6_port	= htons(RXPERF_PORT);
206 
207 	ret = rxrpc_sock_set_min_security_level(socket->sk,
208 						RXRPC_SECURITY_ENCRYPT);
209 	if (ret < 0)
210 		goto error_2;
211 
212 	ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
213 
214 	ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx));
215 	if (ret < 0)
216 		goto error_2;
217 
218 	rxrpc_kernel_set_notifications(socket, &rxperf_rxrpc_callback_ops);
219 
220 	ret = kernel_listen(socket, INT_MAX);
221 	if (ret < 0)
222 		goto error_2;
223 
224 	rxperf_socket = socket;
225 	rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
226 	return 0;
227 
228 error_2:
229 	sock_release(socket);
230 error_1:
231 	pr_err("Can't set up rxperf socket: %d\n", ret);
232 	return ret;
233 }
234 
235 /*
236  * close the rxrpc socket rxperf was using
237  */
238 static void rxperf_close_socket(void)
239 {
240 	kernel_listen(rxperf_socket, 0);
241 	kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
242 	flush_workqueue(rxperf_workqueue);
243 	sock_release(rxperf_socket);
244 }
245 
246 /*
247  * Log remote abort codes that indicate that we have a protocol disagreement
248  * with the server.
249  */
250 static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
251 {
252 	static int max = 0;
253 	const char *msg;
254 	int m;
255 
256 	switch (remote_abort) {
257 	case RX_EOF:		 msg = "unexpected EOF";	break;
258 	case RXGEN_CC_MARSHAL:	 msg = "client marshalling";	break;
259 	case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling";	break;
260 	case RXGEN_SS_MARSHAL:	 msg = "server marshalling";	break;
261 	case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling";	break;
262 	case RXGEN_DECODE:	 msg = "opcode decode";		break;
263 	case RXGEN_SS_XDRFREE:	 msg = "server XDR cleanup";	break;
264 	case RXGEN_CC_XDRFREE:	 msg = "client XDR cleanup";	break;
265 	case -32:		 msg = "insufficient data";	break;
266 	default:
267 		return;
268 	}
269 
270 	m = max;
271 	if (m < 3) {
272 		max = m + 1;
273 		pr_info("Peer reported %s failure on %s\n", msg, call->type);
274 	}
275 }
276 
277 /*
278  * deliver messages to a call
279  */
280 static void rxperf_deliver_to_call(struct work_struct *work)
281 {
282 	struct rxperf_call *call = container_of(work, struct rxperf_call, work);
283 	enum rxperf_call_state state;
284 	u32 abort_code, remote_abort = 0;
285 	int ret = 0;
286 
287 	if (call->state == RXPERF_CALL_COMPLETE)
288 		return;
289 
290 	while (state = call->state,
291 	       state == RXPERF_CALL_SV_AWAIT_PARAMS ||
292 	       state == RXPERF_CALL_SV_AWAIT_REQUEST ||
293 	       state == RXPERF_CALL_SV_AWAIT_ACK
294 	       ) {
295 		if (state == RXPERF_CALL_SV_AWAIT_ACK) {
296 			if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
297 				goto call_complete;
298 			return;
299 		}
300 
301 		ret = call->deliver(call);
302 		if (ret == 0)
303 			ret = rxperf_process_call(call);
304 
305 		switch (ret) {
306 		case 0:
307 			continue;
308 		case -EINPROGRESS:
309 		case -EAGAIN:
310 			return;
311 		case -ECONNABORTED:
312 			rxperf_log_error(call, call->abort_code);
313 			goto call_complete;
314 		case -EOPNOTSUPP:
315 			abort_code = RXGEN_OPCODE;
316 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
317 						abort_code, ret,
318 						rxperf_abort_op_not_supported);
319 			goto call_complete;
320 		case -ENOTSUPP:
321 			abort_code = RX_USER_ABORT;
322 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
323 						abort_code, ret,
324 						rxperf_abort_op_not_supported);
325 			goto call_complete;
326 		case -EIO:
327 			pr_err("Call %u in bad state %u\n",
328 			       call->debug_id, call->state);
329 			fallthrough;
330 		case -ENODATA:
331 		case -EBADMSG:
332 		case -EMSGSIZE:
333 		case -ENOMEM:
334 		case -EFAULT:
335 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
336 						RXGEN_SS_UNMARSHAL, ret,
337 						rxperf_abort_unmarshal_error);
338 			goto call_complete;
339 		default:
340 			rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
341 						RX_CALL_DEAD, ret,
342 						rxperf_abort_general_error);
343 			goto call_complete;
344 		}
345 	}
346 
347 call_complete:
348 	rxperf_set_call_complete(call, ret, remote_abort);
349 	/* The call may have been requeued */
350 	rxrpc_kernel_shutdown_call(rxperf_socket, call->rxcall);
351 	rxrpc_kernel_put_call(rxperf_socket, call->rxcall);
352 	cancel_work(&call->work);
353 	kfree(call);
354 }
355 
356 /*
357  * Extract a piece of data from the received data socket buffers.
358  */
359 static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
360 {
361 	u32 remote_abort = 0;
362 	int ret;
363 
364 	ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
365 				     &call->iov_len, want_more, &remote_abort,
366 				     &call->service_id);
367 	pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
368 		 iov_iter_count(&call->iter), call->iov_len, want_more, ret);
369 	if (ret == 0 || ret == -EAGAIN)
370 		return ret;
371 
372 	if (ret == 1) {
373 		switch (call->state) {
374 		case RXPERF_CALL_SV_AWAIT_REQUEST:
375 			rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
376 			break;
377 		case RXPERF_CALL_COMPLETE:
378 			pr_debug("premature completion %d", call->error);
379 			return call->error;
380 		default:
381 			break;
382 		}
383 		return 0;
384 	}
385 
386 	rxperf_set_call_complete(call, ret, remote_abort);
387 	return ret;
388 }
389 
390 /*
391  * Grab the operation ID from an incoming manager call.
392  */
393 static int rxperf_deliver_param_block(struct rxperf_call *call)
394 {
395 	u32 version;
396 	int ret;
397 
398 	/* Extract the parameter block */
399 	ret = rxperf_extract_data(call, true);
400 	if (ret < 0)
401 		return ret;
402 
403 	version			= ntohl(call->params.version);
404 	call->operation_id	= ntohl(call->params.type);
405 	call->deliver		= rxperf_deliver_request;
406 
407 	if (version != RX_PERF_VERSION) {
408 		pr_info("Version mismatch %x\n", version);
409 		return -ENOTSUPP;
410 	}
411 
412 	switch (call->operation_id) {
413 	case RX_PERF_SEND:
414 		call->type = "send";
415 		call->reply_len = 0;
416 		call->iov_len = 4;	/* Expect req size */
417 		break;
418 	case RX_PERF_RECV:
419 		call->type = "recv";
420 		call->req_len = 0;
421 		call->iov_len = 4;	/* Expect reply size */
422 		break;
423 	case RX_PERF_RPC:
424 		call->type = "rpc";
425 		call->iov_len = 8;	/* Expect req size and reply size */
426 		break;
427 	case RX_PERF_FILE:
428 		call->type = "file";
429 		fallthrough;
430 	default:
431 		return -EOPNOTSUPP;
432 	}
433 
434 	rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
435 	return call->deliver(call);
436 }
437 
438 /*
439  * Deliver the request data.
440  */
441 static int rxperf_deliver_request(struct rxperf_call *call)
442 {
443 	int ret;
444 
445 	switch (call->unmarshal) {
446 	case 0:
447 		call->kvec[0].iov_len	= call->iov_len;
448 		call->kvec[0].iov_base	= call->tmp;
449 		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
450 		call->unmarshal++;
451 		fallthrough;
452 	case 1:
453 		ret = rxperf_extract_data(call, true);
454 		if (ret < 0)
455 			return ret;
456 
457 		switch (call->operation_id) {
458 		case RX_PERF_SEND:
459 			call->type = "send";
460 			call->req_len	= ntohl(call->tmp[0]);
461 			call->reply_len	= 0;
462 			break;
463 		case RX_PERF_RECV:
464 			call->type = "recv";
465 			call->req_len = 0;
466 			call->reply_len	= ntohl(call->tmp[0]);
467 			break;
468 		case RX_PERF_RPC:
469 			call->type = "rpc";
470 			call->req_len	= ntohl(call->tmp[0]);
471 			call->reply_len	= ntohl(call->tmp[1]);
472 			break;
473 		default:
474 			pr_info("Can't parse extra params\n");
475 			return -EIO;
476 		}
477 
478 		pr_debug("CALL op=%s rq=%zx rp=%zx\n",
479 			 call->type, call->req_len, call->reply_len);
480 
481 		call->iov_len = call->req_len;
482 		iov_iter_discard(&call->iter, READ, call->req_len);
483 		call->unmarshal++;
484 		fallthrough;
485 	case 2:
486 		ret = rxperf_extract_data(call, true);
487 		if (ret < 0)
488 			return ret;
489 
490 		/* Deal with the terminal magic cookie. */
491 		call->iov_len = 4;
492 		call->kvec[0].iov_len	= call->iov_len;
493 		call->kvec[0].iov_base	= call->tmp;
494 		iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
495 		call->unmarshal++;
496 		fallthrough;
497 	case 3:
498 		ret = rxperf_extract_data(call, false);
499 		if (ret < 0)
500 			return ret;
501 		call->unmarshal++;
502 		fallthrough;
503 	default:
504 		return 0;
505 	}
506 }
507 
508 /*
509  * Process a call for which we've received the request.
510  */
511 static int rxperf_process_call(struct rxperf_call *call)
512 {
513 	struct msghdr msg = {};
514 	struct bio_vec bv;
515 	struct kvec iov[1];
516 	ssize_t n;
517 	size_t reply_len = call->reply_len, len;
518 
519 	rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
520 				   reply_len + sizeof(rxperf_magic_cookie));
521 
522 	while (reply_len > 0) {
523 		len = umin(reply_len, PAGE_SIZE);
524 		bvec_set_page(&bv, ZERO_PAGE(0), len, 0);
525 		iov_iter_bvec(&msg.msg_iter, WRITE, &bv, 1, len);
526 		msg.msg_flags = MSG_MORE;
527 		n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
528 					   len, rxperf_notify_end_reply_tx);
529 		if (n < 0)
530 			return n;
531 		if (n == 0)
532 			return -EIO;
533 		reply_len -= n;
534 	}
535 
536 	len = sizeof(rxperf_magic_cookie);
537 	iov[0].iov_base	= (void *)rxperf_magic_cookie;
538 	iov[0].iov_len	= len;
539 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
540 	msg.msg_flags = 0;
541 	n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
542 				   rxperf_notify_end_reply_tx);
543 	if (n >= 0)
544 		return 0; /* Success */
545 
546 	if (n == -ENOMEM)
547 		rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
548 					RXGEN_SS_MARSHAL, -ENOMEM,
549 					rxperf_abort_oom);
550 	return n;
551 }
552 
553 /*
554  * Add an rxkad key to the security keyring.
555  */
556 static int rxperf_add_rxkad_key(struct key *keyring)
557 {
558 	key_ref_t kref;
559 	int ret;
560 
561 	kref = key_create_or_update(make_key_ref(keyring, true),
562 				    "rxrpc_s",
563 				    __stringify(RX_PERF_SERVICE) ":2",
564 				    secret,
565 				    sizeof(secret),
566 				    KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
567 				    | KEY_USR_VIEW,
568 				    KEY_ALLOC_NOT_IN_QUOTA);
569 
570 	if (IS_ERR(kref)) {
571 		pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
572 		return PTR_ERR(kref);
573 	}
574 
575 	ret = key_link(keyring, key_ref_to_ptr(kref));
576 	if (ret < 0)
577 		pr_err("Can't link rxperf server key: %d\n", ret);
578 	key_ref_put(kref);
579 	return ret;
580 }
581 
582 #ifdef CONFIG_RXGK
583 /*
584  * Add a yfs-rxgk key to the security keyring.
585  */
586 static int rxperf_add_yfs_rxgk_key(struct key *keyring, u32 enctype)
587 {
588 	const struct krb5_enctype *krb5 = crypto_krb5_find_enctype(enctype);
589 	key_ref_t kref;
590 	char name[64];
591 	int ret;
592 	u8 key[32];
593 
594 	if (!krb5 || krb5->key_len > sizeof(key))
595 		return 0;
596 
597 	/* The key is just { 0, 1, 2, 3, 4, ... } */
598 	for (int i = 0; i < krb5->key_len; i++)
599 		key[i] = i;
600 
601 	sprintf(name, "%u:6:1:%u", RX_PERF_SERVICE, enctype);
602 
603 	kref = key_create_or_update(make_key_ref(keyring, true),
604 				    "rxrpc_s", name,
605 				    key, krb5->key_len,
606 				    KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
607 				    KEY_USR_VIEW,
608 				    KEY_ALLOC_NOT_IN_QUOTA);
609 
610 	if (IS_ERR(kref)) {
611 		pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
612 		return PTR_ERR(kref);
613 	}
614 
615 	ret = key_link(keyring, key_ref_to_ptr(kref));
616 	if (ret < 0)
617 		pr_err("Can't link rxperf server key: %d\n", ret);
618 	key_ref_put(kref);
619 	return ret;
620 }
621 #endif
622 
623 /*
624  * Initialise the rxperf server.
625  */
626 static int __init rxperf_init(void)
627 {
628 	struct key *keyring;
629 	int ret = -ENOMEM;
630 
631 	pr_info("Server registering\n");
632 
633 	rxperf_workqueue = alloc_workqueue("rxperf", 0, 0);
634 	if (!rxperf_workqueue)
635 		goto error_workqueue;
636 
637 	keyring = keyring_alloc("rxperf_server",
638 				GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
639 				KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
640 				KEY_POS_WRITE |
641 				KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
642 				KEY_USR_WRITE |
643 				KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
644 				KEY_ALLOC_NOT_IN_QUOTA,
645 				NULL, NULL);
646 	if (IS_ERR(keyring)) {
647 		pr_err("Can't allocate rxperf server keyring: %ld\n",
648 		       PTR_ERR(keyring));
649 		goto error_keyring;
650 	}
651 	rxperf_sec_keyring = keyring;
652 	ret = rxperf_add_rxkad_key(keyring);
653 	if (ret < 0)
654 		goto error_key;
655 #ifdef CONFIG_RXGK
656 	ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES128_CTS_HMAC_SHA1_96);
657 	if (ret < 0)
658 		goto error_key;
659 	ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES256_CTS_HMAC_SHA1_96);
660 	if (ret < 0)
661 		goto error_key;
662 	ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES128_CTS_HMAC_SHA256_128);
663 	if (ret < 0)
664 		goto error_key;
665 	ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_AES256_CTS_HMAC_SHA384_192);
666 	if (ret < 0)
667 		goto error_key;
668 	ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_CAMELLIA128_CTS_CMAC);
669 	if (ret < 0)
670 		goto error_key;
671 	ret = rxperf_add_yfs_rxgk_key(keyring, KRB5_ENCTYPE_CAMELLIA256_CTS_CMAC);
672 	if (ret < 0)
673 		goto error_key;
674 #endif
675 
676 	ret = rxperf_open_socket();
677 	if (ret < 0)
678 		goto error_socket;
679 	return 0;
680 
681 error_socket:
682 error_key:
683 	key_put(rxperf_sec_keyring);
684 error_keyring:
685 	destroy_workqueue(rxperf_workqueue);
686 	rcu_barrier();
687 error_workqueue:
688 	pr_err("Failed to register: %d\n", ret);
689 	return ret;
690 }
691 late_initcall(rxperf_init); /* Must be called after net/ to create socket */
692 
693 static void __exit rxperf_exit(void)
694 {
695 	pr_info("Server unregistering.\n");
696 
697 	rxperf_close_socket();
698 	key_put(rxperf_sec_keyring);
699 	destroy_workqueue(rxperf_workqueue);
700 	rcu_barrier();
701 }
702 module_exit(rxperf_exit);
703 
704