1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 
put_free_de(struct dlm_ls * ls,struct dlm_direntry * de)27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29 	spin_lock(&ls->ls_recover_list_lock);
30 	list_add(&de->list, &ls->ls_recover_list);
31 	spin_unlock(&ls->ls_recover_list_lock);
32 }
33 
get_free_de(struct dlm_ls * ls,int len)34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36 	int found = 0;
37 	struct dlm_direntry *de;
38 
39 	spin_lock(&ls->ls_recover_list_lock);
40 	list_for_each_entry(de, &ls->ls_recover_list, list) {
41 		if (de->length == len) {
42 			list_del(&de->list);
43 			de->master_nodeid = 0;
44 			memset(de->name, 0, len);
45 			found = 1;
46 			break;
47 		}
48 	}
49 	spin_unlock(&ls->ls_recover_list_lock);
50 
51 	if (!found)
52 		de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS);
53 	return de;
54 }
55 
dlm_clear_free_entries(struct dlm_ls * ls)56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58 	struct dlm_direntry *de;
59 
60 	spin_lock(&ls->ls_recover_list_lock);
61 	while (!list_empty(&ls->ls_recover_list)) {
62 		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63 				list);
64 		list_del(&de->list);
65 		kfree(de);
66 	}
67 	spin_unlock(&ls->ls_recover_list_lock);
68 }
69 
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78 
dlm_hash2nodeid(struct dlm_ls * ls,uint32_t hash)79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81 	struct list_head *tmp;
82 	struct dlm_member *memb = NULL;
83 	uint32_t node, n = 0;
84 	int nodeid;
85 
86 	if (ls->ls_num_nodes == 1) {
87 		nodeid = dlm_our_nodeid();
88 		goto out;
89 	}
90 
91 	if (ls->ls_node_array) {
92 		node = (hash >> 16) % ls->ls_total_weight;
93 		nodeid = ls->ls_node_array[node];
94 		goto out;
95 	}
96 
97 	/* make_member_array() failed to kmalloc ls_node_array... */
98 
99 	node = (hash >> 16) % ls->ls_num_nodes;
100 
101 	list_for_each(tmp, &ls->ls_nodes) {
102 		if (n++ != node)
103 			continue;
104 		memb = list_entry(tmp, struct dlm_member, list);
105 		break;
106 	}
107 
108 	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109 				 ls->ls_num_nodes, n, node););
110 	nodeid = memb->nodeid;
111  out:
112 	return nodeid;
113 }
114 
dlm_dir_nodeid(struct dlm_rsb * r)115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117 	return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119 
dir_hash(struct dlm_ls * ls,char * name,int len)120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122 	uint32_t val;
123 
124 	val = jhash(name, len, 0);
125 	val &= (ls->ls_dirtbl_size - 1);
126 
127 	return val;
128 }
129 
add_entry_to_hash(struct dlm_ls * ls,struct dlm_direntry * de)130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132 	uint32_t bucket;
133 
134 	bucket = dir_hash(ls, de->name, de->length);
135 	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137 
search_bucket(struct dlm_ls * ls,char * name,int namelen,uint32_t bucket)138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139 					  int namelen, uint32_t bucket)
140 {
141 	struct dlm_direntry *de;
142 
143 	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144 		if (de->length == namelen && !memcmp(name, de->name, namelen))
145 			goto out;
146 	}
147 	de = NULL;
148  out:
149 	return de;
150 }
151 
dlm_dir_remove_entry(struct dlm_ls * ls,int nodeid,char * name,int namelen)152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154 	struct dlm_direntry *de;
155 	uint32_t bucket;
156 
157 	bucket = dir_hash(ls, name, namelen);
158 
159 	spin_lock(&ls->ls_dirtbl[bucket].lock);
160 
161 	de = search_bucket(ls, name, namelen, bucket);
162 
163 	if (!de) {
164 		log_error(ls, "remove fr %u none", nodeid);
165 		goto out;
166 	}
167 
168 	if (de->master_nodeid != nodeid) {
169 		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170 		goto out;
171 	}
172 
173 	list_del(&de->list);
174 	kfree(de);
175  out:
176 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178 
dlm_dir_clear(struct dlm_ls * ls)179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181 	struct list_head *head;
182 	struct dlm_direntry *de;
183 	int i;
184 
185 	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186 
187 	for (i = 0; i < ls->ls_dirtbl_size; i++) {
188 		spin_lock(&ls->ls_dirtbl[i].lock);
189 		head = &ls->ls_dirtbl[i].list;
190 		while (!list_empty(head)) {
191 			de = list_entry(head->next, struct dlm_direntry, list);
192 			list_del(&de->list);
193 			put_free_de(ls, de);
194 		}
195 		spin_unlock(&ls->ls_dirtbl[i].lock);
196 	}
197 }
198 
dlm_recover_directory(struct dlm_ls * ls)199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201 	struct dlm_member *memb;
202 	struct dlm_direntry *de;
203 	char *b, *last_name = NULL;
204 	int error = -ENOMEM, last_len, count = 0;
205 	uint16_t namelen;
206 
207 	log_debug(ls, "dlm_recover_directory");
208 
209 	if (dlm_no_directory(ls))
210 		goto out_status;
211 
212 	dlm_dir_clear(ls);
213 
214 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
215 	if (!last_name)
216 		goto out;
217 
218 	list_for_each_entry(memb, &ls->ls_nodes, list) {
219 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
220 		last_len = 0;
221 
222 		for (;;) {
223 			int left;
224 			error = dlm_recovery_stopped(ls);
225 			if (error)
226 				goto out_free;
227 
228 			error = dlm_rcom_names(ls, memb->nodeid,
229 					       last_name, last_len);
230 			if (error)
231 				goto out_free;
232 
233 			schedule();
234 
235 			/*
236 			 * pick namelen/name pairs out of received buffer
237 			 */
238 
239 			b = ls->ls_recover_buf->rc_buf;
240 			left = ls->ls_recover_buf->rc_header.h_length;
241 			left -= sizeof(struct dlm_rcom);
242 
243 			for (;;) {
244 				__be16 v;
245 
246 				error = -EINVAL;
247 				if (left < sizeof(__be16))
248 					goto out_free;
249 
250 				memcpy(&v, b, sizeof(__be16));
251 				namelen = be16_to_cpu(v);
252 				b += sizeof(__be16);
253 				left -= sizeof(__be16);
254 
255 				/* namelen of 0xFFFFF marks end of names for
256 				   this node; namelen of 0 marks end of the
257 				   buffer */
258 
259 				if (namelen == 0xFFFF)
260 					goto done;
261 				if (!namelen)
262 					break;
263 
264 				if (namelen > left)
265 					goto out_free;
266 
267 				if (namelen > DLM_RESNAME_MAXLEN)
268 					goto out_free;
269 
270 				error = -ENOMEM;
271 				de = get_free_de(ls, namelen);
272 				if (!de)
273 					goto out_free;
274 
275 				de->master_nodeid = memb->nodeid;
276 				de->length = namelen;
277 				last_len = namelen;
278 				memcpy(de->name, b, namelen);
279 				memcpy(last_name, b, namelen);
280 				b += namelen;
281 				left -= namelen;
282 
283 				add_entry_to_hash(ls, de);
284 				count++;
285 			}
286 		}
287          done:
288 		;
289 	}
290 
291  out_status:
292 	error = 0;
293 	log_debug(ls, "dlm_recover_directory %d entries", count);
294  out_free:
295 	kfree(last_name);
296  out:
297 	dlm_clear_free_entries(ls);
298 	return error;
299 }
300 
get_entry(struct dlm_ls * ls,int nodeid,char * name,int namelen,int * r_nodeid)301 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
302 		     int namelen, int *r_nodeid)
303 {
304 	struct dlm_direntry *de, *tmp;
305 	uint32_t bucket;
306 
307 	bucket = dir_hash(ls, name, namelen);
308 
309 	spin_lock(&ls->ls_dirtbl[bucket].lock);
310 	de = search_bucket(ls, name, namelen, bucket);
311 	if (de) {
312 		*r_nodeid = de->master_nodeid;
313 		spin_unlock(&ls->ls_dirtbl[bucket].lock);
314 		if (*r_nodeid == nodeid)
315 			return -EEXIST;
316 		return 0;
317 	}
318 
319 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
320 
321 	if (namelen > DLM_RESNAME_MAXLEN)
322 		return -EINVAL;
323 
324 	de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS);
325 	if (!de)
326 		return -ENOMEM;
327 
328 	de->master_nodeid = nodeid;
329 	de->length = namelen;
330 	memcpy(de->name, name, namelen);
331 
332 	spin_lock(&ls->ls_dirtbl[bucket].lock);
333 	tmp = search_bucket(ls, name, namelen, bucket);
334 	if (tmp) {
335 		kfree(de);
336 		de = tmp;
337 	} else {
338 		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
339 	}
340 	*r_nodeid = de->master_nodeid;
341 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
342 	return 0;
343 }
344 
dlm_dir_lookup(struct dlm_ls * ls,int nodeid,char * name,int namelen,int * r_nodeid)345 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
346 		   int *r_nodeid)
347 {
348 	return get_entry(ls, nodeid, name, namelen, r_nodeid);
349 }
350 
find_rsb_root(struct dlm_ls * ls,char * name,int len)351 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
352 {
353 	struct dlm_rsb *r;
354 
355 	down_read(&ls->ls_root_sem);
356 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
357 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
358 			up_read(&ls->ls_root_sem);
359 			return r;
360 		}
361 	}
362 	up_read(&ls->ls_root_sem);
363 	return NULL;
364 }
365 
366 /* Find the rsb where we left off (or start again), then send rsb names
367    for rsb's we're master of and whose directory node matches the requesting
368    node.  inbuf is the rsb name last sent, inlen is the name's length */
369 
dlm_copy_master_names(struct dlm_ls * ls,char * inbuf,int inlen,char * outbuf,int outlen,int nodeid)370 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
371  			   char *outbuf, int outlen, int nodeid)
372 {
373 	struct list_head *list;
374 	struct dlm_rsb *r;
375 	int offset = 0, dir_nodeid;
376 	__be16 be_namelen;
377 
378 	down_read(&ls->ls_root_sem);
379 
380 	if (inlen > 1) {
381 		r = find_rsb_root(ls, inbuf, inlen);
382 		if (!r) {
383 			inbuf[inlen - 1] = '\0';
384 			log_error(ls, "copy_master_names from %d start %d %s",
385 				  nodeid, inlen, inbuf);
386 			goto out;
387 		}
388 		list = r->res_root_list.next;
389 	} else {
390 		list = ls->ls_root_list.next;
391 	}
392 
393 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
394 		r = list_entry(list, struct dlm_rsb, res_root_list);
395 		if (r->res_nodeid)
396 			continue;
397 
398 		dir_nodeid = dlm_dir_nodeid(r);
399 		if (dir_nodeid != nodeid)
400 			continue;
401 
402 		/*
403 		 * The block ends when we can't fit the following in the
404 		 * remaining buffer space:
405 		 * namelen (uint16_t) +
406 		 * name (r->res_length) +
407 		 * end-of-block record 0x0000 (uint16_t)
408 		 */
409 
410 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
411 			/* Write end-of-block record */
412 			be_namelen = cpu_to_be16(0);
413 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
414 			offset += sizeof(__be16);
415 			goto out;
416 		}
417 
418 		be_namelen = cpu_to_be16(r->res_length);
419 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
420 		offset += sizeof(__be16);
421 		memcpy(outbuf + offset, r->res_name, r->res_length);
422 		offset += r->res_length;
423 	}
424 
425 	/*
426 	 * If we've reached the end of the list (and there's room) write a
427 	 * terminating record.
428 	 */
429 
430 	if ((list == &ls->ls_root_list) &&
431 	    (offset + sizeof(uint16_t) <= outlen)) {
432 		be_namelen = cpu_to_be16(0xFFFF);
433 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
434 		offset += sizeof(__be16);
435 	}
436 
437  out:
438 	up_read(&ls->ls_root_sem);
439 }
440 
441