xref: /qemu/net/colo-compare.c (revision db1015e92e04835c9eb50c29625fe566d1202dbd)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or
12  * later.  See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "qemu/error-report.h"
18 #include "trace.h"
19 #include "qapi/error.h"
20 #include "net/net.h"
21 #include "net/eth.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "net/queue.h"
26 #include "chardev/char-fe.h"
27 #include "qemu/sockets.h"
28 #include "colo.h"
29 #include "sysemu/iothread.h"
30 #include "net/colo-compare.h"
31 #include "migration/colo.h"
32 #include "migration/migration.h"
33 #include "util.h"
34 
35 #include "block/aio-wait.h"
36 #include "qemu/coroutine.h"
37 
38 #define TYPE_COLO_COMPARE "colo-compare"
39 typedef struct CompareState CompareState;
40 #define COLO_COMPARE(obj) \
41     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
42 
43 static QTAILQ_HEAD(, CompareState) net_compares =
44        QTAILQ_HEAD_INITIALIZER(net_compares);
45 
46 static NotifierList colo_compare_notifiers =
47     NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
48 
49 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
50 #define MAX_QUEUE_SIZE 1024
51 
52 #define COLO_COMPARE_FREE_PRIMARY     0x01
53 #define COLO_COMPARE_FREE_SECONDARY   0x02
54 
55 #define REGULAR_PACKET_CHECK_MS 3000
56 #define DEFAULT_TIME_OUT_MS 3000
57 
58 static QemuMutex colo_compare_mutex;
59 static bool colo_compare_active;
60 static QemuMutex event_mtx;
61 static QemuCond event_complete_cond;
62 static int event_unhandled_count;
63 static uint32_t max_queue_size;
64 
65 /*
66  *  + CompareState ++
67  *  |               |
68  *  +---------------+   +---------------+         +---------------+
69  *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
70  *  +---------------+   +---------------+         +---------------+
71  *  |               |     |           |             |          |
72  *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
73  *                    |primary |  |secondary    |primary | |secondary
74  *                    |packet  |  |packet  +    |packet  | |packet  +
75  *                    +--------+  +--------+    +--------+ +--------+
76  *                        |           |             |          |
77  *                    +---v----+  +---v----+    +---v----+ +---v----+
78  *                    |primary |  |secondary    |primary | |secondary
79  *                    |packet  |  |packet  +    |packet  | |packet  +
80  *                    +--------+  +--------+    +--------+ +--------+
81  *                        |           |             |          |
82  *                    +---v----+  +---v----+    +---v----+ +---v----+
83  *                    |primary |  |secondary    |primary | |secondary
84  *                    |packet  |  |packet  +    |packet  | |packet  +
85  *                    +--------+  +--------+    +--------+ +--------+
86  */
87 
88 typedef struct SendCo {
89     Coroutine *co;
90     struct CompareState *s;
91     CharBackend *chr;
92     GQueue send_list;
93     bool notify_remote_frame;
94     bool done;
95     int ret;
96 } SendCo;
97 
98 typedef struct SendEntry {
99     uint32_t size;
100     uint32_t vnet_hdr_len;
101     uint8_t *buf;
102 } SendEntry;
103 
104 struct CompareState {
105     Object parent;
106 
107     char *pri_indev;
108     char *sec_indev;
109     char *outdev;
110     char *notify_dev;
111     CharBackend chr_pri_in;
112     CharBackend chr_sec_in;
113     CharBackend chr_out;
114     CharBackend chr_notify_dev;
115     SocketReadState pri_rs;
116     SocketReadState sec_rs;
117     SocketReadState notify_rs;
118     SendCo out_sendco;
119     SendCo notify_sendco;
120     bool vnet_hdr;
121     uint32_t compare_timeout;
122     uint32_t expired_scan_cycle;
123 
124     /*
125      * Record the connection that through the NIC
126      * Element type: Connection
127      */
128     GQueue conn_list;
129     /* Record the connection without repetition */
130     GHashTable *connection_track_table;
131 
132     IOThread *iothread;
133     GMainContext *worker_context;
134     QEMUTimer *packet_check_timer;
135 
136     QEMUBH *event_bh;
137     enum colo_event event;
138 
139     QTAILQ_ENTRY(CompareState) next;
140 };
141 
142 typedef struct CompareClass {
143     ObjectClass parent_class;
144 } CompareClass;
145 
146 enum {
147     PRIMARY_IN = 0,
148     SECONDARY_IN,
149 };
150 
151 static const char *colo_mode[] = {
152     [PRIMARY_IN] = "primary",
153     [SECONDARY_IN] = "secondary",
154 };
155 
156 static int compare_chr_send(CompareState *s,
157                             uint8_t *buf,
158                             uint32_t size,
159                             uint32_t vnet_hdr_len,
160                             bool notify_remote_frame,
161                             bool zero_copy);
162 
163 static bool packet_matches_str(const char *str,
164                                const uint8_t *buf,
165                                uint32_t packet_len)
166 {
167     if (packet_len != strlen(str)) {
168         return false;
169     }
170 
171     return !memcmp(str, buf, strlen(str));
172 }
173 
174 static void notify_remote_frame(CompareState *s)
175 {
176     char msg[] = "DO_CHECKPOINT";
177     int ret = 0;
178 
179     ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
180     if (ret < 0) {
181         error_report("Notify Xen COLO-frame failed");
182     }
183 }
184 
185 static void colo_compare_inconsistency_notify(CompareState *s)
186 {
187     if (s->notify_dev) {
188         notify_remote_frame(s);
189     } else {
190         notifier_list_notify(&colo_compare_notifiers,
191                              migrate_get_current());
192     }
193 }
194 
195 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
196 {
197     struct tcp_hdr *atcp, *btcp;
198 
199     atcp = (struct tcp_hdr *)(a->transport_header);
200     btcp = (struct tcp_hdr *)(b->transport_header);
201     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
202 }
203 
204 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
205 {
206     Packet *pkt = data;
207     struct tcp_hdr *tcphd;
208 
209     tcphd = (struct tcp_hdr *)pkt->transport_header;
210 
211     pkt->tcp_seq = ntohl(tcphd->th_seq);
212     pkt->tcp_ack = ntohl(tcphd->th_ack);
213     *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
214     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
215                        + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
216     pkt->payload_size = pkt->size - pkt->header_size;
217     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
218     pkt->flags = tcphd->th_flags;
219 }
220 
221 /*
222  * Return 1 on success, if return 0 means the
223  * packet will be dropped
224  */
225 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
226 {
227     if (g_queue_get_length(queue) <= max_queue_size) {
228         if (pkt->ip->ip_p == IPPROTO_TCP) {
229             fill_pkt_tcp_info(pkt, max_ack);
230             g_queue_insert_sorted(queue,
231                                   pkt,
232                                   (GCompareDataFunc)seq_sorter,
233                                   NULL);
234         } else {
235             g_queue_push_tail(queue, pkt);
236         }
237         return 1;
238     }
239     return 0;
240 }
241 
242 /*
243  * Return 0 on success, if return -1 means the pkt
244  * is unsupported(arp and ipv6) and will be sent later
245  */
246 static int packet_enqueue(CompareState *s, int mode, Connection **con)
247 {
248     ConnectionKey key;
249     Packet *pkt = NULL;
250     Connection *conn;
251     int ret;
252 
253     if (mode == PRIMARY_IN) {
254         pkt = packet_new(s->pri_rs.buf,
255                          s->pri_rs.packet_len,
256                          s->pri_rs.vnet_hdr_len);
257     } else {
258         pkt = packet_new(s->sec_rs.buf,
259                          s->sec_rs.packet_len,
260                          s->sec_rs.vnet_hdr_len);
261     }
262 
263     if (parse_packet_early(pkt)) {
264         packet_destroy(pkt, NULL);
265         pkt = NULL;
266         return -1;
267     }
268     fill_connection_key(pkt, &key);
269 
270     conn = connection_get(s->connection_track_table,
271                           &key,
272                           &s->conn_list);
273 
274     if (!conn->processing) {
275         g_queue_push_tail(&s->conn_list, conn);
276         conn->processing = true;
277     }
278 
279     if (mode == PRIMARY_IN) {
280         ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
281     } else {
282         ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
283     }
284 
285     if (!ret) {
286         trace_colo_compare_drop_packet(colo_mode[mode],
287             "queue size too big, drop packet");
288         packet_destroy(pkt, NULL);
289         pkt = NULL;
290     }
291 
292     *con = conn;
293 
294     return 0;
295 }
296 
297 static inline bool after(uint32_t seq1, uint32_t seq2)
298 {
299         return (int32_t)(seq1 - seq2) > 0;
300 }
301 
302 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
303 {
304     int ret;
305     ret = compare_chr_send(s,
306                            pkt->data,
307                            pkt->size,
308                            pkt->vnet_hdr_len,
309                            false,
310                            true);
311     if (ret < 0) {
312         error_report("colo send primary packet failed");
313     }
314     trace_colo_compare_main("packet same and release packet");
315     packet_destroy_partial(pkt, NULL);
316 }
317 
318 /*
319  * The IP packets sent by primary and secondary
320  * will be compared in here
321  * TODO support ip fragment, Out-Of-Order
322  * return:    0  means packet same
323  *            > 0 || < 0 means packet different
324  */
325 static int colo_compare_packet_payload(Packet *ppkt,
326                                        Packet *spkt,
327                                        uint16_t poffset,
328                                        uint16_t soffset,
329                                        uint16_t len)
330 
331 {
332     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
333         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
334 
335         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
336         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
337         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
338         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
339 
340         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
341                                    pri_ip_dst, spkt->size,
342                                    sec_ip_src, sec_ip_dst);
343     }
344 
345     return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
346 }
347 
348 /*
349  * return true means that the payload is consist and
350  * need to make the next comparison, false means do
351  * the checkpoint
352 */
353 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
354                               int8_t *mark, uint32_t max_ack)
355 {
356     *mark = 0;
357 
358     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
359         if (!colo_compare_packet_payload(ppkt, spkt,
360                                         ppkt->header_size, spkt->header_size,
361                                         ppkt->payload_size)) {
362             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
363             return true;
364         }
365     }
366 
367     /* one part of secondary packet payload still need to be compared */
368     if (!after(ppkt->seq_end, spkt->seq_end)) {
369         if (!colo_compare_packet_payload(ppkt, spkt,
370                                         ppkt->header_size + ppkt->offset,
371                                         spkt->header_size + spkt->offset,
372                                         ppkt->payload_size - ppkt->offset)) {
373             if (!after(ppkt->tcp_ack, max_ack)) {
374                 *mark = COLO_COMPARE_FREE_PRIMARY;
375                 spkt->offset += ppkt->payload_size - ppkt->offset;
376                 return true;
377             } else {
378                 /* secondary guest hasn't ack the data, don't send
379                  * out this packet
380                  */
381                 return false;
382             }
383         }
384     } else {
385         /* primary packet is longer than secondary packet, compare
386          * the same part and mark the primary packet offset
387          */
388         if (!colo_compare_packet_payload(ppkt, spkt,
389                                         ppkt->header_size + ppkt->offset,
390                                         spkt->header_size + spkt->offset,
391                                         spkt->payload_size - spkt->offset)) {
392             *mark = COLO_COMPARE_FREE_SECONDARY;
393             ppkt->offset += spkt->payload_size - spkt->offset;
394             return true;
395         }
396     }
397 
398     return false;
399 }
400 
401 static void colo_compare_tcp(CompareState *s, Connection *conn)
402 {
403     Packet *ppkt = NULL, *spkt = NULL;
404     int8_t mark;
405 
406     /*
407      * If ppkt and spkt have the same payload, but ppkt's ACK
408      * is greater than spkt's ACK, in this case we can not
409      * send the ppkt because it will cause the secondary guest
410      * to miss sending some data in the next. Therefore, we
411      * record the maximum ACK in the current queue at both
412      * primary side and secondary side. Only when the ack is
413      * less than the smaller of the two maximum ack, then we
414      * can ensure that the packet's payload is acknowledged by
415      * primary and secondary.
416     */
417     uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
418 
419 pri:
420     if (g_queue_is_empty(&conn->primary_list)) {
421         return;
422     }
423     ppkt = g_queue_pop_head(&conn->primary_list);
424 sec:
425     if (g_queue_is_empty(&conn->secondary_list)) {
426         g_queue_push_head(&conn->primary_list, ppkt);
427         return;
428     }
429     spkt = g_queue_pop_head(&conn->secondary_list);
430 
431     if (ppkt->tcp_seq == ppkt->seq_end) {
432         colo_release_primary_pkt(s, ppkt);
433         ppkt = NULL;
434     }
435 
436     if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
437         trace_colo_compare_main("pri: this packet has compared");
438         colo_release_primary_pkt(s, ppkt);
439         ppkt = NULL;
440     }
441 
442     if (spkt->tcp_seq == spkt->seq_end) {
443         packet_destroy(spkt, NULL);
444         if (!ppkt) {
445             goto pri;
446         } else {
447             goto sec;
448         }
449     } else {
450         if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
451             trace_colo_compare_main("sec: this packet has compared");
452             packet_destroy(spkt, NULL);
453             if (!ppkt) {
454                 goto pri;
455             } else {
456                 goto sec;
457             }
458         }
459         if (!ppkt) {
460             g_queue_push_head(&conn->secondary_list, spkt);
461             goto pri;
462         }
463     }
464 
465     if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
466         trace_colo_compare_tcp_info("pri",
467                                     ppkt->tcp_seq, ppkt->tcp_ack,
468                                     ppkt->header_size, ppkt->payload_size,
469                                     ppkt->offset, ppkt->flags);
470 
471         trace_colo_compare_tcp_info("sec",
472                                     spkt->tcp_seq, spkt->tcp_ack,
473                                     spkt->header_size, spkt->payload_size,
474                                     spkt->offset, spkt->flags);
475 
476         if (mark == COLO_COMPARE_FREE_PRIMARY) {
477             conn->compare_seq = ppkt->seq_end;
478             colo_release_primary_pkt(s, ppkt);
479             g_queue_push_head(&conn->secondary_list, spkt);
480             goto pri;
481         }
482         if (mark == COLO_COMPARE_FREE_SECONDARY) {
483             conn->compare_seq = spkt->seq_end;
484             packet_destroy(spkt, NULL);
485             goto sec;
486         }
487         if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
488             conn->compare_seq = ppkt->seq_end;
489             colo_release_primary_pkt(s, ppkt);
490             packet_destroy(spkt, NULL);
491             goto pri;
492         }
493     } else {
494         g_queue_push_head(&conn->primary_list, ppkt);
495         g_queue_push_head(&conn->secondary_list, spkt);
496 
497         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
498             qemu_hexdump((char *)ppkt->data, stderr,
499                         "colo-compare ppkt", ppkt->size);
500             qemu_hexdump((char *)spkt->data, stderr,
501                         "colo-compare spkt", spkt->size);
502         }
503 
504         colo_compare_inconsistency_notify(s);
505     }
506 }
507 
508 
509 /*
510  * Called from the compare thread on the primary
511  * for compare udp packet
512  */
513 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
514 {
515     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
516     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
517 
518     trace_colo_compare_main("compare udp");
519 
520     /*
521      * Because of ppkt and spkt are both in the same connection,
522      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
523      * same with spkt. In addition, IP header's Identification is a random
524      * field, we can handle it in IP fragmentation function later.
525      * COLO just concern the response net packet payload from primary guest
526      * and secondary guest are same or not, So we ignored all IP header include
527      * other field like TOS,TTL,IP Checksum. we only need to compare
528      * the ip payload here.
529      */
530     if (ppkt->size != spkt->size) {
531         trace_colo_compare_main("UDP: payload size of packets are different");
532         return -1;
533     }
534     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
535                                     ppkt->size - offset)) {
536         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
537         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
538         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
539             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
540                          ppkt->size);
541             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
542                          spkt->size);
543         }
544         return -1;
545     } else {
546         return 0;
547     }
548 }
549 
550 /*
551  * Called from the compare thread on the primary
552  * for compare icmp packet
553  */
554 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
555 {
556     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
557     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
558 
559     trace_colo_compare_main("compare icmp");
560 
561     /*
562      * Because of ppkt and spkt are both in the same connection,
563      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
564      * same with spkt. In addition, IP header's Identification is a random
565      * field, we can handle it in IP fragmentation function later.
566      * COLO just concern the response net packet payload from primary guest
567      * and secondary guest are same or not, So we ignored all IP header include
568      * other field like TOS,TTL,IP Checksum. we only need to compare
569      * the ip payload here.
570      */
571     if (ppkt->size != spkt->size) {
572         trace_colo_compare_main("ICMP: payload size of packets are different");
573         return -1;
574     }
575     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
576                                     ppkt->size - offset)) {
577         trace_colo_compare_icmp_miscompare("primary pkt size",
578                                            ppkt->size);
579         trace_colo_compare_icmp_miscompare("Secondary pkt size",
580                                            spkt->size);
581         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
582             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
583                          ppkt->size);
584             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
585                          spkt->size);
586         }
587         return -1;
588     } else {
589         return 0;
590     }
591 }
592 
593 /*
594  * Called from the compare thread on the primary
595  * for compare other packet
596  */
597 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
598 {
599     uint16_t offset = ppkt->vnet_hdr_len;
600 
601     trace_colo_compare_main("compare other");
602     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
603         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
604 
605         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
606         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
607         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
608         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
609 
610         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
611                                    pri_ip_dst, spkt->size,
612                                    sec_ip_src, sec_ip_dst);
613     }
614 
615     if (ppkt->size != spkt->size) {
616         trace_colo_compare_main("Other: payload size of packets are different");
617         return -1;
618     }
619     return colo_compare_packet_payload(ppkt, spkt, offset, offset,
620                                        ppkt->size - offset);
621 }
622 
623 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
624 {
625     int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
626 
627     if ((now - pkt->creation_ms) > (*check_time)) {
628         trace_colo_old_packet_check_found(pkt->creation_ms);
629         return 0;
630     } else {
631         return 1;
632     }
633 }
634 
635 void colo_compare_register_notifier(Notifier *notify)
636 {
637     notifier_list_add(&colo_compare_notifiers, notify);
638 }
639 
640 void colo_compare_unregister_notifier(Notifier *notify)
641 {
642     notifier_remove(notify);
643 }
644 
645 static int colo_old_packet_check_one_conn(Connection *conn,
646                                           CompareState *s)
647 {
648     GList *result = NULL;
649 
650     result = g_queue_find_custom(&conn->primary_list,
651                                  &s->compare_timeout,
652                                  (GCompareFunc)colo_old_packet_check_one);
653 
654     if (result) {
655         /* Do checkpoint will flush old packet */
656         colo_compare_inconsistency_notify(s);
657         return 0;
658     }
659 
660     return 1;
661 }
662 
663 /*
664  * Look for old packets that the secondary hasn't matched,
665  * if we have some then we have to checkpoint to wake
666  * the secondary up.
667  */
668 static void colo_old_packet_check(void *opaque)
669 {
670     CompareState *s = opaque;
671 
672     /*
673      * If we find one old packet, stop finding job and notify
674      * COLO frame do checkpoint.
675      */
676     g_queue_find_custom(&s->conn_list, s,
677                         (GCompareFunc)colo_old_packet_check_one_conn);
678 }
679 
680 static void colo_compare_packet(CompareState *s, Connection *conn,
681                                 int (*HandlePacket)(Packet *spkt,
682                                 Packet *ppkt))
683 {
684     Packet *pkt = NULL;
685     GList *result = NULL;
686 
687     while (!g_queue_is_empty(&conn->primary_list) &&
688            !g_queue_is_empty(&conn->secondary_list)) {
689         pkt = g_queue_pop_head(&conn->primary_list);
690         result = g_queue_find_custom(&conn->secondary_list,
691                  pkt, (GCompareFunc)HandlePacket);
692 
693         if (result) {
694             colo_release_primary_pkt(s, pkt);
695             g_queue_remove(&conn->secondary_list, result->data);
696         } else {
697             /*
698              * If one packet arrive late, the secondary_list or
699              * primary_list will be empty, so we can't compare it
700              * until next comparison. If the packets in the list are
701              * timeout, it will trigger a checkpoint request.
702              */
703             trace_colo_compare_main("packet different");
704             g_queue_push_head(&conn->primary_list, pkt);
705 
706             colo_compare_inconsistency_notify(s);
707             break;
708         }
709     }
710 }
711 
712 /*
713  * Called from the compare thread on the primary
714  * for compare packet with secondary list of the
715  * specified connection when a new packet was
716  * queued to it.
717  */
718 static void colo_compare_connection(void *opaque, void *user_data)
719 {
720     CompareState *s = user_data;
721     Connection *conn = opaque;
722 
723     switch (conn->ip_proto) {
724     case IPPROTO_TCP:
725         colo_compare_tcp(s, conn);
726         break;
727     case IPPROTO_UDP:
728         colo_compare_packet(s, conn, colo_packet_compare_udp);
729         break;
730     case IPPROTO_ICMP:
731         colo_compare_packet(s, conn, colo_packet_compare_icmp);
732         break;
733     default:
734         colo_compare_packet(s, conn, colo_packet_compare_other);
735         break;
736     }
737 }
738 
739 static void coroutine_fn _compare_chr_send(void *opaque)
740 {
741     SendCo *sendco = opaque;
742     CompareState *s = sendco->s;
743     int ret = 0;
744 
745     while (!g_queue_is_empty(&sendco->send_list)) {
746         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
747         uint32_t len = htonl(entry->size);
748 
749         ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
750 
751         if (ret != sizeof(len)) {
752             g_free(entry->buf);
753             g_slice_free(SendEntry, entry);
754             goto err;
755         }
756 
757         if (!sendco->notify_remote_frame && s->vnet_hdr) {
758             /*
759              * We send vnet header len make other module(like filter-redirector)
760              * know how to parse net packet correctly.
761              */
762             len = htonl(entry->vnet_hdr_len);
763 
764             ret = qemu_chr_fe_write_all(sendco->chr,
765                                         (uint8_t *)&len,
766                                         sizeof(len));
767 
768             if (ret != sizeof(len)) {
769                 g_free(entry->buf);
770                 g_slice_free(SendEntry, entry);
771                 goto err;
772             }
773         }
774 
775         ret = qemu_chr_fe_write_all(sendco->chr,
776                                     (uint8_t *)entry->buf,
777                                     entry->size);
778 
779         if (ret != entry->size) {
780             g_free(entry->buf);
781             g_slice_free(SendEntry, entry);
782             goto err;
783         }
784 
785         g_free(entry->buf);
786         g_slice_free(SendEntry, entry);
787     }
788 
789     sendco->ret = 0;
790     goto out;
791 
792 err:
793     while (!g_queue_is_empty(&sendco->send_list)) {
794         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
795         g_free(entry->buf);
796         g_slice_free(SendEntry, entry);
797     }
798     sendco->ret = ret < 0 ? ret : -EIO;
799 out:
800     sendco->co = NULL;
801     sendco->done = true;
802     aio_wait_kick();
803 }
804 
805 static int compare_chr_send(CompareState *s,
806                             uint8_t *buf,
807                             uint32_t size,
808                             uint32_t vnet_hdr_len,
809                             bool notify_remote_frame,
810                             bool zero_copy)
811 {
812     SendCo *sendco;
813     SendEntry *entry;
814 
815     if (notify_remote_frame) {
816         sendco = &s->notify_sendco;
817     } else {
818         sendco = &s->out_sendco;
819     }
820 
821     if (!size) {
822         return 0;
823     }
824 
825     entry = g_slice_new(SendEntry);
826     entry->size = size;
827     entry->vnet_hdr_len = vnet_hdr_len;
828     if (zero_copy) {
829         entry->buf = buf;
830     } else {
831         entry->buf = g_malloc(size);
832         memcpy(entry->buf, buf, size);
833     }
834     g_queue_push_head(&sendco->send_list, entry);
835 
836     if (sendco->done) {
837         sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
838         sendco->done = false;
839         qemu_coroutine_enter(sendco->co);
840         if (sendco->done) {
841             /* report early errors */
842             return sendco->ret;
843         }
844     }
845 
846     /* assume success */
847     return 0;
848 }
849 
850 static int compare_chr_can_read(void *opaque)
851 {
852     return COMPARE_READ_LEN_MAX;
853 }
854 
855 /*
856  * Called from the main thread on the primary for packets
857  * arriving over the socket from the primary.
858  */
859 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
860 {
861     CompareState *s = COLO_COMPARE(opaque);
862     int ret;
863 
864     ret = net_fill_rstate(&s->pri_rs, buf, size);
865     if (ret == -1) {
866         qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
867                                  NULL, NULL, true);
868         error_report("colo-compare primary_in error");
869     }
870 }
871 
872 /*
873  * Called from the main thread on the primary for packets
874  * arriving over the socket from the secondary.
875  */
876 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
877 {
878     CompareState *s = COLO_COMPARE(opaque);
879     int ret;
880 
881     ret = net_fill_rstate(&s->sec_rs, buf, size);
882     if (ret == -1) {
883         qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
884                                  NULL, NULL, true);
885         error_report("colo-compare secondary_in error");
886     }
887 }
888 
889 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
890 {
891     CompareState *s = COLO_COMPARE(opaque);
892     int ret;
893 
894     ret = net_fill_rstate(&s->notify_rs, buf, size);
895     if (ret == -1) {
896         qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
897                                  NULL, NULL, true);
898         error_report("colo-compare notify_dev error");
899     }
900 }
901 
902 /*
903  * Check old packet regularly so it can watch for any packets
904  * that the secondary hasn't produced equivalents of.
905  */
906 static void check_old_packet_regular(void *opaque)
907 {
908     CompareState *s = opaque;
909 
910     /* if have old packet we will notify checkpoint */
911     colo_old_packet_check(s);
912     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
913               s->expired_scan_cycle);
914 }
915 
916 /* Public API, Used for COLO frame to notify compare event */
917 void colo_notify_compares_event(void *opaque, int event, Error **errp)
918 {
919     CompareState *s;
920     qemu_mutex_lock(&colo_compare_mutex);
921 
922     if (!colo_compare_active) {
923         qemu_mutex_unlock(&colo_compare_mutex);
924         return;
925     }
926 
927     qemu_mutex_lock(&event_mtx);
928     QTAILQ_FOREACH(s, &net_compares, next) {
929         s->event = event;
930         qemu_bh_schedule(s->event_bh);
931         event_unhandled_count++;
932     }
933     /* Wait all compare threads to finish handling this event */
934     while (event_unhandled_count > 0) {
935         qemu_cond_wait(&event_complete_cond, &event_mtx);
936     }
937 
938     qemu_mutex_unlock(&event_mtx);
939     qemu_mutex_unlock(&colo_compare_mutex);
940 }
941 
942 static void colo_compare_timer_init(CompareState *s)
943 {
944     AioContext *ctx = iothread_get_aio_context(s->iothread);
945 
946     s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
947                                 SCALE_MS, check_old_packet_regular,
948                                 s);
949     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
950               s->expired_scan_cycle);
951 }
952 
953 static void colo_compare_timer_del(CompareState *s)
954 {
955     if (s->packet_check_timer) {
956         timer_del(s->packet_check_timer);
957         timer_free(s->packet_check_timer);
958         s->packet_check_timer = NULL;
959     }
960  }
961 
962 static void colo_flush_packets(void *opaque, void *user_data);
963 
964 static void colo_compare_handle_event(void *opaque)
965 {
966     CompareState *s = opaque;
967 
968     switch (s->event) {
969     case COLO_EVENT_CHECKPOINT:
970         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
971         break;
972     case COLO_EVENT_FAILOVER:
973         break;
974     default:
975         break;
976     }
977 
978     qemu_mutex_lock(&event_mtx);
979     assert(event_unhandled_count > 0);
980     event_unhandled_count--;
981     qemu_cond_broadcast(&event_complete_cond);
982     qemu_mutex_unlock(&event_mtx);
983 }
984 
985 static void colo_compare_iothread(CompareState *s)
986 {
987     AioContext *ctx = iothread_get_aio_context(s->iothread);
988     object_ref(OBJECT(s->iothread));
989     s->worker_context = iothread_get_g_main_context(s->iothread);
990 
991     qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
992                              compare_pri_chr_in, NULL, NULL,
993                              s, s->worker_context, true);
994     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
995                              compare_sec_chr_in, NULL, NULL,
996                              s, s->worker_context, true);
997     if (s->notify_dev) {
998         qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
999                                  compare_notify_chr, NULL, NULL,
1000                                  s, s->worker_context, true);
1001     }
1002 
1003     colo_compare_timer_init(s);
1004     s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
1005 }
1006 
1007 static char *compare_get_pri_indev(Object *obj, Error **errp)
1008 {
1009     CompareState *s = COLO_COMPARE(obj);
1010 
1011     return g_strdup(s->pri_indev);
1012 }
1013 
1014 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
1015 {
1016     CompareState *s = COLO_COMPARE(obj);
1017 
1018     g_free(s->pri_indev);
1019     s->pri_indev = g_strdup(value);
1020 }
1021 
1022 static char *compare_get_sec_indev(Object *obj, Error **errp)
1023 {
1024     CompareState *s = COLO_COMPARE(obj);
1025 
1026     return g_strdup(s->sec_indev);
1027 }
1028 
1029 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
1030 {
1031     CompareState *s = COLO_COMPARE(obj);
1032 
1033     g_free(s->sec_indev);
1034     s->sec_indev = g_strdup(value);
1035 }
1036 
1037 static char *compare_get_outdev(Object *obj, Error **errp)
1038 {
1039     CompareState *s = COLO_COMPARE(obj);
1040 
1041     return g_strdup(s->outdev);
1042 }
1043 
1044 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
1045 {
1046     CompareState *s = COLO_COMPARE(obj);
1047 
1048     g_free(s->outdev);
1049     s->outdev = g_strdup(value);
1050 }
1051 
1052 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
1053 {
1054     CompareState *s = COLO_COMPARE(obj);
1055 
1056     return s->vnet_hdr;
1057 }
1058 
1059 static void compare_set_vnet_hdr(Object *obj,
1060                                  bool value,
1061                                  Error **errp)
1062 {
1063     CompareState *s = COLO_COMPARE(obj);
1064 
1065     s->vnet_hdr = value;
1066 }
1067 
1068 static char *compare_get_notify_dev(Object *obj, Error **errp)
1069 {
1070     CompareState *s = COLO_COMPARE(obj);
1071 
1072     return g_strdup(s->notify_dev);
1073 }
1074 
1075 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
1076 {
1077     CompareState *s = COLO_COMPARE(obj);
1078 
1079     g_free(s->notify_dev);
1080     s->notify_dev = g_strdup(value);
1081 }
1082 
1083 static void compare_get_timeout(Object *obj, Visitor *v,
1084                                 const char *name, void *opaque,
1085                                 Error **errp)
1086 {
1087     CompareState *s = COLO_COMPARE(obj);
1088     uint32_t value = s->compare_timeout;
1089 
1090     visit_type_uint32(v, name, &value, errp);
1091 }
1092 
1093 static void compare_set_timeout(Object *obj, Visitor *v,
1094                                 const char *name, void *opaque,
1095                                 Error **errp)
1096 {
1097     CompareState *s = COLO_COMPARE(obj);
1098     uint32_t value;
1099 
1100     if (!visit_type_uint32(v, name, &value, errp)) {
1101         return;
1102     }
1103     if (!value) {
1104         error_setg(errp, "Property '%s.%s' requires a positive value",
1105                    object_get_typename(obj), name);
1106         return;
1107     }
1108     s->compare_timeout = value;
1109 }
1110 
1111 static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
1112                                            const char *name, void *opaque,
1113                                            Error **errp)
1114 {
1115     CompareState *s = COLO_COMPARE(obj);
1116     uint32_t value = s->expired_scan_cycle;
1117 
1118     visit_type_uint32(v, name, &value, errp);
1119 }
1120 
1121 static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
1122                                            const char *name, void *opaque,
1123                                            Error **errp)
1124 {
1125     CompareState *s = COLO_COMPARE(obj);
1126     uint32_t value;
1127 
1128     if (!visit_type_uint32(v, name, &value, errp)) {
1129         return;
1130     }
1131     if (!value) {
1132         error_setg(errp, "Property '%s.%s' requires a positive value",
1133                    object_get_typename(obj), name);
1134         return;
1135     }
1136     s->expired_scan_cycle = value;
1137 }
1138 
1139 static void get_max_queue_size(Object *obj, Visitor *v,
1140                                const char *name, void *opaque,
1141                                Error **errp)
1142 {
1143     uint32_t value = max_queue_size;
1144 
1145     visit_type_uint32(v, name, &value, errp);
1146 }
1147 
1148 static void set_max_queue_size(Object *obj, Visitor *v,
1149                                const char *name, void *opaque,
1150                                Error **errp)
1151 {
1152     Error *local_err = NULL;
1153     uint32_t value;
1154 
1155     visit_type_uint32(v, name, &value, &local_err);
1156     if (local_err) {
1157         goto out;
1158     }
1159     if (!value) {
1160         error_setg(&local_err, "Property '%s.%s' requires a positive value",
1161                    object_get_typename(obj), name);
1162         goto out;
1163     }
1164     max_queue_size = value;
1165 
1166 out:
1167     error_propagate(errp, local_err);
1168 }
1169 
1170 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1171 {
1172     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
1173     Connection *conn = NULL;
1174 
1175     if (packet_enqueue(s, PRIMARY_IN, &conn)) {
1176         trace_colo_compare_main("primary: unsupported packet in");
1177         compare_chr_send(s,
1178                          pri_rs->buf,
1179                          pri_rs->packet_len,
1180                          pri_rs->vnet_hdr_len,
1181                          false,
1182                          false);
1183     } else {
1184         /* compare packet in the specified connection */
1185         colo_compare_connection(conn, s);
1186     }
1187 }
1188 
1189 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1190 {
1191     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
1192     Connection *conn = NULL;
1193 
1194     if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1195         trace_colo_compare_main("secondary: unsupported packet in");
1196     } else {
1197         /* compare packet in the specified connection */
1198         colo_compare_connection(conn, s);
1199     }
1200 }
1201 
1202 static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1203 {
1204     CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1205 
1206     const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1207     int ret;
1208 
1209     if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1210                            notify_rs->buf,
1211                            notify_rs->packet_len)) {
1212         ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
1213         if (ret < 0) {
1214             error_report("Notify Xen COLO-frame INIT failed");
1215         }
1216     } else if (packet_matches_str("COLO_CHECKPOINT",
1217                                   notify_rs->buf,
1218                                   notify_rs->packet_len)) {
1219         /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1220         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1221     } else {
1222         error_report("COLO compare got unsupported instruction");
1223     }
1224 }
1225 
1226 /*
1227  * Return 0 is success.
1228  * Return 1 is failed.
1229  */
1230 static int find_and_check_chardev(Chardev **chr,
1231                                   char *chr_name,
1232                                   Error **errp)
1233 {
1234     *chr = qemu_chr_find(chr_name);
1235     if (*chr == NULL) {
1236         error_setg(errp, "Device '%s' not found",
1237                    chr_name);
1238         return 1;
1239     }
1240 
1241     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1242         error_setg(errp, "chardev \"%s\" is not reconnectable",
1243                    chr_name);
1244         return 1;
1245     }
1246 
1247     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1248         error_setg(errp, "chardev \"%s\" cannot switch context",
1249                    chr_name);
1250         return 1;
1251     }
1252 
1253     return 0;
1254 }
1255 
1256 /*
1257  * Called from the main thread on the primary
1258  * to setup colo-compare.
1259  */
1260 static void colo_compare_complete(UserCreatable *uc, Error **errp)
1261 {
1262     CompareState *s = COLO_COMPARE(uc);
1263     Chardev *chr;
1264 
1265     if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1266         error_setg(errp, "colo compare needs 'primary_in' ,"
1267                    "'secondary_in','outdev','iothread' property set");
1268         return;
1269     } else if (!strcmp(s->pri_indev, s->outdev) ||
1270                !strcmp(s->sec_indev, s->outdev) ||
1271                !strcmp(s->pri_indev, s->sec_indev)) {
1272         error_setg(errp, "'indev' and 'outdev' could not be same "
1273                    "for compare module");
1274         return;
1275     }
1276 
1277     if (!s->compare_timeout) {
1278         /* Set default value to 3000 MS */
1279         s->compare_timeout = DEFAULT_TIME_OUT_MS;
1280     }
1281 
1282     if (!s->expired_scan_cycle) {
1283         /* Set default value to 3000 MS */
1284         s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
1285     }
1286 
1287     if (!max_queue_size) {
1288         /* Set default queue size to 1024 */
1289         max_queue_size = MAX_QUEUE_SIZE;
1290     }
1291 
1292     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1293         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1294         return;
1295     }
1296 
1297     if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1298         !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1299         return;
1300     }
1301 
1302     if (find_and_check_chardev(&chr, s->outdev, errp) ||
1303         !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1304         return;
1305     }
1306 
1307     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1308     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1309 
1310     /* Try to enable remote notify chardev, currently just for Xen COLO */
1311     if (s->notify_dev) {
1312         if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1313             !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1314             return;
1315         }
1316 
1317         net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1318                            s->vnet_hdr);
1319     }
1320 
1321     s->out_sendco.s = s;
1322     s->out_sendco.chr = &s->chr_out;
1323     s->out_sendco.notify_remote_frame = false;
1324     s->out_sendco.done = true;
1325     g_queue_init(&s->out_sendco.send_list);
1326 
1327     if (s->notify_dev) {
1328         s->notify_sendco.s = s;
1329         s->notify_sendco.chr = &s->chr_notify_dev;
1330         s->notify_sendco.notify_remote_frame = true;
1331         s->notify_sendco.done = true;
1332         g_queue_init(&s->notify_sendco.send_list);
1333     }
1334 
1335     g_queue_init(&s->conn_list);
1336 
1337     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1338                                                       connection_key_equal,
1339                                                       g_free,
1340                                                       connection_destroy);
1341 
1342     colo_compare_iothread(s);
1343 
1344     qemu_mutex_lock(&colo_compare_mutex);
1345     if (!colo_compare_active) {
1346         qemu_mutex_init(&event_mtx);
1347         qemu_cond_init(&event_complete_cond);
1348         colo_compare_active = true;
1349     }
1350     QTAILQ_INSERT_TAIL(&net_compares, s, next);
1351     qemu_mutex_unlock(&colo_compare_mutex);
1352 
1353     return;
1354 }
1355 
1356 static void colo_flush_packets(void *opaque, void *user_data)
1357 {
1358     CompareState *s = user_data;
1359     Connection *conn = opaque;
1360     Packet *pkt = NULL;
1361 
1362     while (!g_queue_is_empty(&conn->primary_list)) {
1363         pkt = g_queue_pop_head(&conn->primary_list);
1364         compare_chr_send(s,
1365                          pkt->data,
1366                          pkt->size,
1367                          pkt->vnet_hdr_len,
1368                          false,
1369                          true);
1370         packet_destroy_partial(pkt, NULL);
1371     }
1372     while (!g_queue_is_empty(&conn->secondary_list)) {
1373         pkt = g_queue_pop_head(&conn->secondary_list);
1374         packet_destroy(pkt, NULL);
1375     }
1376 }
1377 
1378 static void colo_compare_class_init(ObjectClass *oc, void *data)
1379 {
1380     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1381 
1382     ucc->complete = colo_compare_complete;
1383 }
1384 
1385 static void colo_compare_init(Object *obj)
1386 {
1387     CompareState *s = COLO_COMPARE(obj);
1388 
1389     object_property_add_str(obj, "primary_in",
1390                             compare_get_pri_indev, compare_set_pri_indev);
1391     object_property_add_str(obj, "secondary_in",
1392                             compare_get_sec_indev, compare_set_sec_indev);
1393     object_property_add_str(obj, "outdev",
1394                             compare_get_outdev, compare_set_outdev);
1395     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1396                             (Object **)&s->iothread,
1397                             object_property_allow_set_link,
1398                             OBJ_PROP_LINK_STRONG);
1399     /* This parameter just for Xen COLO */
1400     object_property_add_str(obj, "notify_dev",
1401                             compare_get_notify_dev, compare_set_notify_dev);
1402 
1403     object_property_add(obj, "compare_timeout", "uint32",
1404                         compare_get_timeout,
1405                         compare_set_timeout, NULL, NULL);
1406 
1407     object_property_add(obj, "expired_scan_cycle", "uint32",
1408                         compare_get_expired_scan_cycle,
1409                         compare_set_expired_scan_cycle, NULL, NULL);
1410 
1411     object_property_add(obj, "max_queue_size", "uint32",
1412                         get_max_queue_size,
1413                         set_max_queue_size, NULL, NULL);
1414 
1415     s->vnet_hdr = false;
1416     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1417                              compare_set_vnet_hdr);
1418 }
1419 
1420 static void colo_compare_finalize(Object *obj)
1421 {
1422     CompareState *s = COLO_COMPARE(obj);
1423     CompareState *tmp = NULL;
1424 
1425     qemu_mutex_lock(&colo_compare_mutex);
1426     QTAILQ_FOREACH(tmp, &net_compares, next) {
1427         if (tmp == s) {
1428             QTAILQ_REMOVE(&net_compares, s, next);
1429             break;
1430         }
1431     }
1432     if (QTAILQ_EMPTY(&net_compares)) {
1433         colo_compare_active = false;
1434         qemu_mutex_destroy(&event_mtx);
1435         qemu_cond_destroy(&event_complete_cond);
1436     }
1437     qemu_mutex_unlock(&colo_compare_mutex);
1438 
1439     qemu_chr_fe_deinit(&s->chr_pri_in, false);
1440     qemu_chr_fe_deinit(&s->chr_sec_in, false);
1441     qemu_chr_fe_deinit(&s->chr_out, false);
1442     if (s->notify_dev) {
1443         qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1444     }
1445 
1446     colo_compare_timer_del(s);
1447 
1448     qemu_bh_delete(s->event_bh);
1449 
1450     AioContext *ctx = iothread_get_aio_context(s->iothread);
1451     aio_context_acquire(ctx);
1452     AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
1453     if (s->notify_dev) {
1454         AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
1455     }
1456     aio_context_release(ctx);
1457 
1458     /* Release all unhandled packets after compare thead exited */
1459     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1460     AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
1461 
1462     g_queue_clear(&s->conn_list);
1463     g_queue_clear(&s->out_sendco.send_list);
1464     if (s->notify_dev) {
1465         g_queue_clear(&s->notify_sendco.send_list);
1466     }
1467 
1468     if (s->connection_track_table) {
1469         g_hash_table_destroy(s->connection_track_table);
1470     }
1471 
1472     object_unref(OBJECT(s->iothread));
1473 
1474     g_free(s->pri_indev);
1475     g_free(s->sec_indev);
1476     g_free(s->outdev);
1477     g_free(s->notify_dev);
1478 }
1479 
1480 static void __attribute__((__constructor__)) colo_compare_init_globals(void)
1481 {
1482     colo_compare_active = false;
1483     qemu_mutex_init(&colo_compare_mutex);
1484 }
1485 
1486 static const TypeInfo colo_compare_info = {
1487     .name = TYPE_COLO_COMPARE,
1488     .parent = TYPE_OBJECT,
1489     .instance_size = sizeof(CompareState),
1490     .instance_init = colo_compare_init,
1491     .instance_finalize = colo_compare_finalize,
1492     .class_size = sizeof(CompareClass),
1493     .class_init = colo_compare_class_init,
1494     .interfaces = (InterfaceInfo[]) {
1495         { TYPE_USER_CREATABLE },
1496         { }
1497     }
1498 };
1499 
1500 static void register_types(void)
1501 {
1502     type_register_static(&colo_compare_info);
1503 }
1504 
1505 type_init(register_types);
1506