xref: /src/sbin/ipfw/tests/test_add_rule.py (revision 32cd3ee5901ea33d41ff550e5f40ce743c8d4165)
1import errno
2import json
3import os
4import socket
5import struct
6import subprocess
7import sys
8from ctypes import c_byte
9from ctypes import c_char
10from ctypes import c_int
11from ctypes import c_long
12from ctypes import c_uint32
13from ctypes import c_uint8
14from ctypes import c_ulong
15from ctypes import c_ushort
16from ctypes import sizeof
17from ctypes import Structure
18from enum import Enum
19from typing import Any
20from typing import Dict
21from typing import List
22from typing import NamedTuple
23from typing import Optional
24from typing import Union
25
26import pytest
27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
29from atf_python.sys.netpfil.ipfw.insns import Insn
30from atf_python.sys.netpfil.ipfw.insns import InsnComment
31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
32from atf_python.sys.netpfil.ipfw.insns import InsnIp
33from atf_python.sys.netpfil.ipfw.insns import InsnIp6
34from atf_python.sys.netpfil.ipfw.insns import InsnPorts
35from atf_python.sys.netpfil.ipfw.insns import InsnProb
36from atf_python.sys.netpfil.ipfw.insns import InsnProto
37from atf_python.sys.netpfil.ipfw.insns import InsnReject
38from atf_python.sys.netpfil.ipfw.insns import InsnTable
39from atf_python.sys.netpfil.ipfw.insns import InsnU32
40from atf_python.sys.netpfil.ipfw.insns import InsnKidx
41from atf_python.sys.netpfil.ipfw.insns import InsnLookup
42from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
43from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableLookupType
44from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableValueType
45from atf_python.sys.netpfil.ipfw.ioctl import CTlv
46from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
47from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
48from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
49from atf_python.sys.netpfil.ipfw.ioctl import NTlv
50from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
51from atf_python.sys.netpfil.ipfw.ioctl import RawRule
52from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
53from atf_python.sys.netpfil.ipfw.utils import enum_from_int
54from atf_python.utils import BaseTest
55
56
57IPFW_PATH = "/sbin/ipfw"
58
59
60def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
61    if bytes(w_obj) == bytes(g_obj):
62        return True
63    num_objects = 0
64    for i, w_child in enumerate(w_obj.obj_list):
65        if i >= len(g_obj.obj_list):
66            print("MISSING object from chain {}".format(" / ".join(w_stack)))
67            w_child.print_obj()
68            print("==========================")
69            return False
70        g_child = g_obj.obj_list[i]
71        if bytes(w_child) == bytes(g_child):
72            num_objects += 1
73            continue
74        w_stack.append(w_obj.obj_name)
75        g_stack.append(g_obj.obj_name)
76        if not differ(w_child, g_child, w_stack, g_stack):
77            return False
78        break
79    if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
80        g_child = g_obj.obj_list[num_objects]
81        print("EXTRA object from chain {}".format(" / ".join(g_stack)))
82        g_child.print_obj()
83        print("==========================")
84        return False
85    print("OBJECTS DIFFER")
86    print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
87    w_obj.print_obj()
88    w_obj.print_obj_hex()
89    print("==========================")
90    print("GOT CHAIN: {}".format(" / ".join(g_stack)))
91    g_obj.print_obj()
92    g_obj.print_obj_hex()
93    print("==========================")
94    return False
95
96
97class TestAddRule(BaseTest):
98    def compile_rule(self, out):
99        tlvs = []
100        if "objs" in out:
101            tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
102        rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
103        tlvs.append(CTlvRule(obj_list=[rule]))
104        return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
105
106    def verify_rule(self, in_data: str, out_data):
107        # Prepare the desired output
108        expected = self.compile_rule(out_data)
109
110        reader = DebugIoReader(IPFW_PATH)
111        ioctls = reader.get_records(in_data)
112        assert len(ioctls) == 1  # Only 1 ioctl request expected
113        got = ioctls[0]
114
115        if not differ(expected, got):
116            print("=> CMD: {}".format(in_data))
117            print("=> WANTED:")
118            expected.print_obj()
119            print("==========================")
120            print("=> GOT:")
121            got.print_obj()
122            print("==========================")
123        assert bytes(got) == bytes(expected)
124
125    @pytest.mark.parametrize(
126        "rule",
127        [
128            pytest.param(
129                {
130                    "in": "add 200 allow ip from any to any",
131                    "out": {
132                        "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
133                        "rulenum": 200,
134                    },
135                },
136                id="test_rulenum",
137            ),
138            pytest.param(
139                {
140                    "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
141                    "out": {
142                        "insns": [
143                            InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
144                            InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
145                            InsnEmpty(IpFwOpcode.O_ACCEPT),
146                        ],
147                    },
148                },
149                id="test_or",
150            ),
151            pytest.param(
152                {
153                    "in": "add allow ip from table(AAA) to table(BBB)",
154                    "out": {
155                        "objs": [
156                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
157                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
158                        ],
159                        "insns": [
160                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
161                            InsnKidx(IpFwOpcode.O_IP_DST_LOOKUP, kidx=2),
162                            InsnEmpty(IpFwOpcode.O_ACCEPT),
163                        ],
164                    },
165                },
166                id="test_tables",
167            ),
168            pytest.param(
169                {
170                    "in": "add allow ip from table(AAA) to any lookup dst-ip BBB",
171                    "out": {
172                        "objs": [
173                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
174                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
175                        ],
176                        "insns": [
177                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
178                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
179                                kidx=2,
180                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP),
181                                ),
182                            InsnEmpty(IpFwOpcode.O_ACCEPT),
183                        ],
184                    },
185                },
186                id="test_tables_lookup_no_mask",
187            ),
188            pytest.param(
189                {
190                    "in": "add allow ip from table(AAA) to any lookup mark:0xf00baa1 BBB",
191                    "out": {
192                        "objs": [
193                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
194                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
195                        ],
196                        "insns": [
197                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
198                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
199                                kidx=2,
200                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_MARK, bitmask=True),
201                                bitmask=0xf00baa1),
202                            InsnEmpty(IpFwOpcode.O_ACCEPT),
203                        ],
204                    },
205                },
206                id="test_tables_lookup_u32_mask",
207            ),
208            pytest.param(
209                {
210                    "in": "add allow ip from table(AAA) to any lookup src-mac:1a:2b:3c:4d:5e:6f BBB",
211                    "out": {
212                        "objs": [
213                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
214                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
215                        ],
216                        "insns": [
217                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
218                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
219                                kidx=2,
220                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_MAC, bitmask=True),
221                                bitmask="1a:2b:3c:4d:5e:6f"),
222                            InsnEmpty(IpFwOpcode.O_ACCEPT),
223                        ],
224                    },
225                },
226                id="test_tables_lookup_mac_mask",
227            ),
228            pytest.param(
229                {
230                    "in": "add allow ip from table(AAA) to any lookup dst-ip4:1715004 BBB",
231                    "out": {
232                        "objs": [
233                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
234                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
235                        ],
236                        "insns": [
237                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
238                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
239                                kidx=2,
240                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP4, bitmask=True),
241                                bitmask="60.43.26.0"),
242                            InsnEmpty(IpFwOpcode.O_ACCEPT),
243                        ],
244                    },
245                },
246                id="test_tables_lookup_dst_ip4_numeric",
247            ),
248            pytest.param(
249                {
250                    "in": "add allow ip from table(AAA) to any lookup src-ip4:0.0.0.255 BBB",
251                    "out": {
252                        "objs": [
253                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
254                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
255                        ],
256                        "insns": [
257                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
258                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
259                                kidx=2,
260                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_IP4, bitmask=True),
261                                bitmask="0.0.0.255"),
262                            InsnEmpty(IpFwOpcode.O_ACCEPT),
263                        ],
264                    },
265                },
266                id="test_tables_lookup_src_ip4_addr",
267            ),
268            pytest.param(
269                {
270                    "in": "add allow ip from table(AAA) to any lookup jail:0.0.252.128 BBB",
271                    "out": {
272                        "objs": [
273                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
274                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
275                        ],
276                        "insns": [
277                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
278                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
279                                kidx=2,
280                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_JAIL, bitmask=True),
281                                bitmask=64640),
282                            InsnEmpty(IpFwOpcode.O_ACCEPT),
283                        ],
284                    },
285                },
286                id="test_tables_lookup_jail_addr",
287            ),
288            pytest.param(
289                {
290                    "in": "add allow ip from table(AAA) to any lookup dst-ip6:ffff:ffff:f00:baaa:b00c:: BBB",
291                    "out": {
292                        "objs": [
293                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
294                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
295                        ],
296                        "insns": [
297                            InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1),
298                            InsnLookup(IpFwOpcode.O_TABLE_LOOKUP,
299                                kidx=2,
300                                arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP6, bitmask=True),
301                                bitmask="ffff:ffff:f00:baaa:b00c::"),
302                            InsnEmpty(IpFwOpcode.O_ACCEPT),
303                        ],
304                    },
305                },
306                id="test_tables_lookup_dst_ip6",
307            ),
308            pytest.param(
309                {
310                    "in": "add allow ip from table(AAA,16777215) to any",
311                    "out": {
312                        "objs": [
313                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
314                        ],
315                        "insns": [
316                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
317                                kidx=1,
318                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_TAG),
319                                value=16777215),
320                            InsnEmpty(IpFwOpcode.O_ACCEPT),
321                        ],
322                    },
323                },
324                id="test_tables_check_value_legacy",
325            ),
326            pytest.param(
327                {
328                    "in": "add allow ip from table(AAA,nat=1231) to any",
329                    "out": {
330                        "objs": [
331                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
332                        ],
333                        "insns": [
334                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
335                                kidx=1,
336                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NAT),
337                                value=1231),
338                            InsnEmpty(IpFwOpcode.O_ACCEPT),
339                        ],
340                    },
341                },
342                id="test_tables_check_value_nat",
343            ),
344            pytest.param(
345                {
346                    "in": "add allow ip from table(AAA,nh4=10.20.30.40) to any",
347                    "out": {
348                        "objs": [
349                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
350                        ],
351                        "insns": [
352                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
353                                kidx=1,
354                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH4),
355                                value="10.20.30.40"),
356                            InsnEmpty(IpFwOpcode.O_ACCEPT),
357                        ],
358                    },
359                },
360                id="test_tables_check_value_nh4",
361            ),
362            pytest.param(
363                {
364                    "in": "add allow ip from table(AAA,nh6=ff02:1234:b00c::abcd) to any",
365                    "out": {
366                        "objs": [
367                            NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
368                        ],
369                        "insns": [
370                            InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP,
371                                kidx=1,
372                                arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH6),
373                                value="ff02:1234:b00c::abcd"),
374                            InsnEmpty(IpFwOpcode.O_ACCEPT),
375                        ],
376                    },
377                },
378                id="test_tables_check_value_nh6",
379            ),
380            pytest.param(
381                {
382                    "in": "add allow ip from any to 1.2.3.4 // test comment",
383                    "out": {
384                        "insns": [
385                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
386                            InsnComment(comment="test comment"),
387                            InsnEmpty(IpFwOpcode.O_ACCEPT),
388                        ],
389                    },
390                },
391                id="test_comment",
392            ),
393            pytest.param(
394                {
395                    "in": "add tcp-setmss 123 ip from any to 1.2.3.4",
396                    "out": {
397                        "objs": [
398                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"),
399                        ],
400                        "insns": [
401                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
402                            InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1),
403                            Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123),
404                        ],
405                    },
406                },
407                id="test_eaction_tcp-setmss",
408            ),
409            pytest.param(
410                {
411                    "in": "add eaction nptv6 AAA ip from any to 1.2.3.4",
412                    "out": {
413                        "objs": [
414                            NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="nptv6"),
415                            NTlv(0, idx=2, name="AAA"),
416                        ],
417                        "insns": [
418                            InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
419                            InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1),
420                            InsnKidx(IpFwOpcode.O_EXTERNAL_INSTANCE, kidx=2),
421                        ],
422                    },
423                },
424                id="test_eaction_nptv6",
425            ),
426            pytest.param(
427                {
428                    "in": "add // test comment",
429                    "out": {
430                        "insns": [
431                            InsnComment(comment="test comment"),
432                            Insn(IpFwOpcode.O_COUNT),
433                        ],
434                    },
435                },
436                id="test_action_comment",
437            ),
438            pytest.param(
439                {
440                    "in": "add check-state :OUT // test comment",
441                    "out": {
442                        "objs": [
443                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
444                        ],
445                        "insns": [
446                            InsnComment(comment="test comment"),
447                            InsnKidx(IpFwOpcode.O_CHECK_STATE, kidx=1),
448                        ],
449                    },
450                },
451                id="test_check_state",
452            ),
453            pytest.param(
454                {
455                    "in": "add allow tcp from any to any keep-state :OUT",
456                    "out": {
457                        "objs": [
458                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"),
459                        ],
460                        "insns": [
461                            InsnKidx(IpFwOpcode.O_PROBE_STATE, kidx=1),
462                            Insn(IpFwOpcode.O_PROTO, arg1=6),
463                            InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1),
464                            InsnEmpty(IpFwOpcode.O_ACCEPT),
465                        ],
466                    },
467                },
468                id="test_keep_state",
469            ),
470            pytest.param(
471                {
472                    "in": "add allow tcp from any to any record-state",
473                    "out": {
474                        "objs": [
475                            NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"),
476                        ],
477                        "insns": [
478                            Insn(IpFwOpcode.O_PROTO, arg1=6),
479                            InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1),
480                            InsnEmpty(IpFwOpcode.O_ACCEPT),
481                        ],
482                    },
483                },
484                id="test_record_state",
485            ),
486        ],
487    )
488    def test_add_rule(self, rule):
489        """Tests if the compiled rule is sane and matches the spec"""
490        self.verify_rule(rule["in"], rule["out"])
491
492    @pytest.mark.parametrize(
493        "action",
494        [
495            pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
496            pytest.param(
497                (
498                    "abort",
499                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
500                ),
501                id="abort",
502            ),
503            pytest.param(
504                (
505                    "abort6",
506                    Insn(
507                        IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
508                    ),
509                ),
510                id="abort6",
511            ),
512            pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
513            pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
514            pytest.param(
515                (
516                    "reject",
517                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
518                ),
519                id="reject",
520            ),
521            pytest.param(
522                (
523                    "reset",
524                    Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
525                ),
526                id="reset",
527            ),
528            pytest.param(
529                (
530                    "reset6",
531                    Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
532                ),
533                id="reset6",
534            ),
535            pytest.param(
536                (
537                    "unreach port",
538                    InsnReject(
539                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
540                    ),
541                ),
542                id="unreach_port",
543            ),
544            pytest.param(
545                (
546                    "unreach port",
547                    InsnReject(
548                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
549                    ),
550                ),
551                id="unreach_port",
552            ),
553            pytest.param(
554                (
555                    "unreach needfrag",
556                    InsnReject(
557                        IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
558                    ),
559                ),
560                id="unreach_needfrag",
561            ),
562            pytest.param(
563                (
564                    "unreach needfrag 1420",
565                    InsnReject(
566                        IpFwOpcode.O_REJECT,
567                        arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
568                        mtu=1420,
569                    ),
570                ),
571                id="unreach_needfrag_mtu",
572            ),
573            pytest.param(
574                (
575                    "unreach6 port",
576                    Insn(
577                        IpFwOpcode.O_UNREACH6,
578                        arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
579                    ),
580                ),
581                id="unreach6_port",
582            ),
583            pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
584            # TOK_NAT
585            pytest.param(
586                ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
587            ),
588            pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
589            pytest.param(
590                ("skipto 42", InsnU32(IpFwOpcode.O_SKIPTO, u32=42)), id="skipto_42"
591            ),
592            pytest.param(
593                ("skipto 4200", InsnU32(IpFwOpcode.O_SKIPTO, u32=4200)), id="skipto_4200"
594            ),
595            pytest.param(
596                ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
597            ),
598            pytest.param(
599                ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
600            ),
601            pytest.param(
602                ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
603            ),
604            pytest.param(
605                ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
606            ),
607            pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
608            pytest.param(
609                ("call 420", InsnU32(IpFwOpcode.O_CALLRETURN, u32=420)), id="call_420"
610            ),
611            # TOK_FORWARD
612            pytest.param(
613                ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
614                id="setfib_1",
615                marks=pytest.mark.skip("needs net.fibs>1"),
616            ),
617            pytest.param(
618                ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
619                id="setdscp_42",
620            ),
621            pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
622            pytest.param(
623                ("return", InsnU32(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
624            ),
625        ],
626    )
627    def test_add_action(self, action):
628        """Tests if the rule action is compiled properly"""
629        rule_in = "add {} ip from any to any".format(action[0])
630        rule_out = {"insns": [action[1]]}
631        self.verify_rule(rule_in, rule_out)
632
633    @pytest.mark.parametrize(
634        "insn",
635        [
636            pytest.param(
637                {
638                    "in": "add prob 0.7 allow ip from any to any",
639                    "out": InsnProb(prob=0.7),
640                },
641                id="test_prob",
642            ),
643            pytest.param(
644                {
645                    "in": "add allow tcp from any to any",
646                    "out": InsnProto(arg1=6),
647                },
648                id="test_proto",
649            ),
650            pytest.param(
651                {
652                    "in": "add allow ip from any to any 57",
653                    "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
654                },
655                id="test_ports",
656            ),
657        ],
658    )
659    def test_add_single_instruction(self, insn):
660        """Tests if the compiled rule is sane and matches the spec"""
661
662        # Prepare the desired output
663        out = {
664            "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
665        }
666        self.verify_rule(insn["in"], out)
667
668    @pytest.mark.parametrize(
669        "opcode",
670        [
671            pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
672            pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
673        ],
674    )
675    @pytest.mark.parametrize(
676        "params",
677        [
678            pytest.param(
679                {
680                    "in": "57",
681                    "out": [(57, 57)],
682                },
683                id="test_single",
684            ),
685            pytest.param(
686                {
687                    "in": "57-59",
688                    "out": [(57, 59)],
689                },
690                id="test_range",
691            ),
692            pytest.param(
693                {
694                    "in": "57-59,41",
695                    "out": [(57, 59), (41, 41)],
696                },
697                id="test_ranges",
698            ),
699        ],
700    )
701    def test_add_ports(self, params, opcode):
702        if opcode == IpFwOpcode.O_IP_DSTPORT:
703            txt = "add allow ip from any to any " + params["in"]
704        else:
705            txt = "add allow ip from any " + params["in"] + " to any"
706        out = {
707            "insns": [
708                InsnPorts(opcode, port_pairs=params["out"]),
709                InsnEmpty(IpFwOpcode.O_ACCEPT),
710            ]
711        }
712        self.verify_rule(txt, out)
713