1import errno 2import json 3import os 4import socket 5import struct 6import subprocess 7import sys 8from ctypes import c_byte 9from ctypes import c_char 10from ctypes import c_int 11from ctypes import c_long 12from ctypes import c_uint32 13from ctypes import c_uint8 14from ctypes import c_ulong 15from ctypes import c_ushort 16from ctypes import sizeof 17from ctypes import Structure 18from enum import Enum 19from typing import Any 20from typing import Dict 21from typing import List 22from typing import NamedTuple 23from typing import Optional 24from typing import Union 25 26import pytest 27from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode 28from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode 29from atf_python.sys.netpfil.ipfw.insns import Insn 30from atf_python.sys.netpfil.ipfw.insns import InsnComment 31from atf_python.sys.netpfil.ipfw.insns import InsnEmpty 32from atf_python.sys.netpfil.ipfw.insns import InsnIp 33from atf_python.sys.netpfil.ipfw.insns import InsnIp6 34from atf_python.sys.netpfil.ipfw.insns import InsnPorts 35from atf_python.sys.netpfil.ipfw.insns import InsnProb 36from atf_python.sys.netpfil.ipfw.insns import InsnProto 37from atf_python.sys.netpfil.ipfw.insns import InsnReject 38from atf_python.sys.netpfil.ipfw.insns import InsnTable 39from atf_python.sys.netpfil.ipfw.insns import InsnU32 40from atf_python.sys.netpfil.ipfw.insns import InsnKidx 41from atf_python.sys.netpfil.ipfw.insns import InsnLookup 42from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode 43from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableLookupType 44from atf_python.sys.netpfil.ipfw.insn_headers import IpFwTableValueType 45from atf_python.sys.netpfil.ipfw.ioctl import CTlv 46from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule 47from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType 48from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule 49from atf_python.sys.netpfil.ipfw.ioctl import NTlv 50from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType 51from atf_python.sys.netpfil.ipfw.ioctl import RawRule 52from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader 53from atf_python.sys.netpfil.ipfw.utils import enum_from_int 54from atf_python.utils import BaseTest 55 56 57IPFW_PATH = "/sbin/ipfw" 58 59 60def differ(w_obj, g_obj, w_stack=[], g_stack=[]): 61 if bytes(w_obj) == bytes(g_obj): 62 return True 63 num_objects = 0 64 for i, w_child in enumerate(w_obj.obj_list): 65 if i >= len(g_obj.obj_list): 66 print("MISSING object from chain {}".format(" / ".join(w_stack))) 67 w_child.print_obj() 68 print("==========================") 69 return False 70 g_child = g_obj.obj_list[i] 71 if bytes(w_child) == bytes(g_child): 72 num_objects += 1 73 continue 74 w_stack.append(w_obj.obj_name) 75 g_stack.append(g_obj.obj_name) 76 if not differ(w_child, g_child, w_stack, g_stack): 77 return False 78 break 79 if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): 80 g_child = g_obj.obj_list[num_objects] 81 print("EXTRA object from chain {}".format(" / ".join(g_stack))) 82 g_child.print_obj() 83 print("==========================") 84 return False 85 print("OBJECTS DIFFER") 86 print("WANTED CHAIN: {}".format(" / ".join(w_stack))) 87 w_obj.print_obj() 88 w_obj.print_obj_hex() 89 print("==========================") 90 print("GOT CHAIN: {}".format(" / ".join(g_stack))) 91 g_obj.print_obj() 92 g_obj.print_obj_hex() 93 print("==========================") 94 return False 95 96 97class TestAddRule(BaseTest): 98 def compile_rule(self, out): 99 tlvs = [] 100 if "objs" in out: 101 tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) 102 rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) 103 tlvs.append(CTlvRule(obj_list=[rule])) 104 return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) 105 106 def verify_rule(self, in_data: str, out_data): 107 # Prepare the desired output 108 expected = self.compile_rule(out_data) 109 110 reader = DebugIoReader(IPFW_PATH) 111 ioctls = reader.get_records(in_data) 112 assert len(ioctls) == 1 # Only 1 ioctl request expected 113 got = ioctls[0] 114 115 if not differ(expected, got): 116 print("=> CMD: {}".format(in_data)) 117 print("=> WANTED:") 118 expected.print_obj() 119 print("==========================") 120 print("=> GOT:") 121 got.print_obj() 122 print("==========================") 123 assert bytes(got) == bytes(expected) 124 125 @pytest.mark.parametrize( 126 "rule", 127 [ 128 pytest.param( 129 { 130 "in": "add 200 allow ip from any to any", 131 "out": { 132 "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], 133 "rulenum": 200, 134 }, 135 }, 136 id="test_rulenum", 137 ), 138 pytest.param( 139 { 140 "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", 141 "out": { 142 "insns": [ 143 InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), 144 InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), 145 InsnEmpty(IpFwOpcode.O_ACCEPT), 146 ], 147 }, 148 }, 149 id="test_or", 150 ), 151 pytest.param( 152 { 153 "in": "add allow ip from table(AAA) to table(BBB)", 154 "out": { 155 "objs": [ 156 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 157 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 158 ], 159 "insns": [ 160 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 161 InsnKidx(IpFwOpcode.O_IP_DST_LOOKUP, kidx=2), 162 InsnEmpty(IpFwOpcode.O_ACCEPT), 163 ], 164 }, 165 }, 166 id="test_tables", 167 ), 168 pytest.param( 169 { 170 "in": "add allow ip from table(AAA) to any lookup dst-ip BBB", 171 "out": { 172 "objs": [ 173 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 174 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 175 ], 176 "insns": [ 177 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 178 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 179 kidx=2, 180 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP), 181 ), 182 InsnEmpty(IpFwOpcode.O_ACCEPT), 183 ], 184 }, 185 }, 186 id="test_tables_lookup_no_mask", 187 ), 188 pytest.param( 189 { 190 "in": "add allow ip from table(AAA) to any lookup mark:0xf00baa1 BBB", 191 "out": { 192 "objs": [ 193 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 194 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 195 ], 196 "insns": [ 197 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 198 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 199 kidx=2, 200 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_MARK, bitmask=True), 201 bitmask=0xf00baa1), 202 InsnEmpty(IpFwOpcode.O_ACCEPT), 203 ], 204 }, 205 }, 206 id="test_tables_lookup_u32_mask", 207 ), 208 pytest.param( 209 { 210 "in": "add allow ip from table(AAA) to any lookup src-mac:1a:2b:3c:4d:5e:6f BBB", 211 "out": { 212 "objs": [ 213 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 214 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 215 ], 216 "insns": [ 217 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 218 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 219 kidx=2, 220 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_MAC, bitmask=True), 221 bitmask="1a:2b:3c:4d:5e:6f"), 222 InsnEmpty(IpFwOpcode.O_ACCEPT), 223 ], 224 }, 225 }, 226 id="test_tables_lookup_mac_mask", 227 ), 228 pytest.param( 229 { 230 "in": "add allow ip from table(AAA) to any lookup dst-ip4:1715004 BBB", 231 "out": { 232 "objs": [ 233 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 234 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 235 ], 236 "insns": [ 237 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 238 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 239 kidx=2, 240 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP4, bitmask=True), 241 bitmask="60.43.26.0"), 242 InsnEmpty(IpFwOpcode.O_ACCEPT), 243 ], 244 }, 245 }, 246 id="test_tables_lookup_dst_ip4_numeric", 247 ), 248 pytest.param( 249 { 250 "in": "add allow ip from table(AAA) to any lookup src-ip4:0.0.0.255 BBB", 251 "out": { 252 "objs": [ 253 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 254 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 255 ], 256 "insns": [ 257 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 258 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 259 kidx=2, 260 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_SRC_IP4, bitmask=True), 261 bitmask="0.0.0.255"), 262 InsnEmpty(IpFwOpcode.O_ACCEPT), 263 ], 264 }, 265 }, 266 id="test_tables_lookup_src_ip4_addr", 267 ), 268 pytest.param( 269 { 270 "in": "add allow ip from table(AAA) to any lookup jail:0.0.252.128 BBB", 271 "out": { 272 "objs": [ 273 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 274 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 275 ], 276 "insns": [ 277 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 278 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 279 kidx=2, 280 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_JAIL, bitmask=True), 281 bitmask=64640), 282 InsnEmpty(IpFwOpcode.O_ACCEPT), 283 ], 284 }, 285 }, 286 id="test_tables_lookup_jail_addr", 287 ), 288 pytest.param( 289 { 290 "in": "add allow ip from table(AAA) to any lookup dst-ip6:ffff:ffff:f00:baaa:b00c:: BBB", 291 "out": { 292 "objs": [ 293 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 294 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), 295 ], 296 "insns": [ 297 InsnKidx(IpFwOpcode.O_IP_SRC_LOOKUP, kidx=1), 298 InsnLookup(IpFwOpcode.O_TABLE_LOOKUP, 299 kidx=2, 300 arg1=InsnLookup.compile_arg1(lookup_type=IpFwTableLookupType.LOOKUP_DST_IP6, bitmask=True), 301 bitmask="ffff:ffff:f00:baaa:b00c::"), 302 InsnEmpty(IpFwOpcode.O_ACCEPT), 303 ], 304 }, 305 }, 306 id="test_tables_lookup_dst_ip6", 307 ), 308 pytest.param( 309 { 310 "in": "add allow ip from table(AAA,16777215) to any", 311 "out": { 312 "objs": [ 313 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 314 ], 315 "insns": [ 316 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 317 kidx=1, 318 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_TAG), 319 value=16777215), 320 InsnEmpty(IpFwOpcode.O_ACCEPT), 321 ], 322 }, 323 }, 324 id="test_tables_check_value_legacy", 325 ), 326 pytest.param( 327 { 328 "in": "add allow ip from table(AAA,nat=1231) to any", 329 "out": { 330 "objs": [ 331 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 332 ], 333 "insns": [ 334 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 335 kidx=1, 336 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NAT), 337 value=1231), 338 InsnEmpty(IpFwOpcode.O_ACCEPT), 339 ], 340 }, 341 }, 342 id="test_tables_check_value_nat", 343 ), 344 pytest.param( 345 { 346 "in": "add allow ip from table(AAA,nh4=10.20.30.40) to any", 347 "out": { 348 "objs": [ 349 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 350 ], 351 "insns": [ 352 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 353 kidx=1, 354 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH4), 355 value="10.20.30.40"), 356 InsnEmpty(IpFwOpcode.O_ACCEPT), 357 ], 358 }, 359 }, 360 id="test_tables_check_value_nh4", 361 ), 362 pytest.param( 363 { 364 "in": "add allow ip from table(AAA,nh6=ff02:1234:b00c::abcd) to any", 365 "out": { 366 "objs": [ 367 NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), 368 ], 369 "insns": [ 370 InsnLookup(IpFwOpcode.O_IP_SRC_LOOKUP, 371 kidx=1, 372 arg1=InsnLookup.compile_arg1(value_type=IpFwTableValueType.TVALUE_NH6), 373 value="ff02:1234:b00c::abcd"), 374 InsnEmpty(IpFwOpcode.O_ACCEPT), 375 ], 376 }, 377 }, 378 id="test_tables_check_value_nh6", 379 ), 380 pytest.param( 381 { 382 "in": "add allow ip from any to 1.2.3.4 // test comment", 383 "out": { 384 "insns": [ 385 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 386 InsnComment(comment="test comment"), 387 InsnEmpty(IpFwOpcode.O_ACCEPT), 388 ], 389 }, 390 }, 391 id="test_comment", 392 ), 393 pytest.param( 394 { 395 "in": "add tcp-setmss 123 ip from any to 1.2.3.4", 396 "out": { 397 "objs": [ 398 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"), 399 ], 400 "insns": [ 401 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 402 InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1), 403 Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123), 404 ], 405 }, 406 }, 407 id="test_eaction_tcp-setmss", 408 ), 409 pytest.param( 410 { 411 "in": "add eaction nptv6 AAA ip from any to 1.2.3.4", 412 "out": { 413 "objs": [ 414 NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="nptv6"), 415 NTlv(0, idx=2, name="AAA"), 416 ], 417 "insns": [ 418 InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), 419 InsnKidx(IpFwOpcode.O_EXTERNAL_ACTION, kidx=1), 420 InsnKidx(IpFwOpcode.O_EXTERNAL_INSTANCE, kidx=2), 421 ], 422 }, 423 }, 424 id="test_eaction_nptv6", 425 ), 426 pytest.param( 427 { 428 "in": "add // test comment", 429 "out": { 430 "insns": [ 431 InsnComment(comment="test comment"), 432 Insn(IpFwOpcode.O_COUNT), 433 ], 434 }, 435 }, 436 id="test_action_comment", 437 ), 438 pytest.param( 439 { 440 "in": "add check-state :OUT // test comment", 441 "out": { 442 "objs": [ 443 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 444 ], 445 "insns": [ 446 InsnComment(comment="test comment"), 447 InsnKidx(IpFwOpcode.O_CHECK_STATE, kidx=1), 448 ], 449 }, 450 }, 451 id="test_check_state", 452 ), 453 pytest.param( 454 { 455 "in": "add allow tcp from any to any keep-state :OUT", 456 "out": { 457 "objs": [ 458 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), 459 ], 460 "insns": [ 461 InsnKidx(IpFwOpcode.O_PROBE_STATE, kidx=1), 462 Insn(IpFwOpcode.O_PROTO, arg1=6), 463 InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1), 464 InsnEmpty(IpFwOpcode.O_ACCEPT), 465 ], 466 }, 467 }, 468 id="test_keep_state", 469 ), 470 pytest.param( 471 { 472 "in": "add allow tcp from any to any record-state", 473 "out": { 474 "objs": [ 475 NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"), 476 ], 477 "insns": [ 478 Insn(IpFwOpcode.O_PROTO, arg1=6), 479 InsnKidx(IpFwOpcode.O_KEEP_STATE, kidx=1), 480 InsnEmpty(IpFwOpcode.O_ACCEPT), 481 ], 482 }, 483 }, 484 id="test_record_state", 485 ), 486 ], 487 ) 488 def test_add_rule(self, rule): 489 """Tests if the compiled rule is sane and matches the spec""" 490 self.verify_rule(rule["in"], rule["out"]) 491 492 @pytest.mark.parametrize( 493 "action", 494 [ 495 pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"), 496 pytest.param( 497 ( 498 "abort", 499 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT), 500 ), 501 id="abort", 502 ), 503 pytest.param( 504 ( 505 "abort6", 506 Insn( 507 IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT 508 ), 509 ), 510 id="abort6", 511 ), 512 pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"), 513 pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"), 514 pytest.param( 515 ( 516 "reject", 517 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST), 518 ), 519 id="reject", 520 ), 521 pytest.param( 522 ( 523 "reset", 524 Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST), 525 ), 526 id="reset", 527 ), 528 pytest.param( 529 ( 530 "reset6", 531 Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST), 532 ), 533 id="reset6", 534 ), 535 pytest.param( 536 ( 537 "unreach port", 538 InsnReject( 539 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 540 ), 541 ), 542 id="unreach_port", 543 ), 544 pytest.param( 545 ( 546 "unreach port", 547 InsnReject( 548 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT 549 ), 550 ), 551 id="unreach_port", 552 ), 553 pytest.param( 554 ( 555 "unreach needfrag", 556 InsnReject( 557 IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG 558 ), 559 ), 560 id="unreach_needfrag", 561 ), 562 pytest.param( 563 ( 564 "unreach needfrag 1420", 565 InsnReject( 566 IpFwOpcode.O_REJECT, 567 arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG, 568 mtu=1420, 569 ), 570 ), 571 id="unreach_needfrag_mtu", 572 ), 573 pytest.param( 574 ( 575 "unreach6 port", 576 Insn( 577 IpFwOpcode.O_UNREACH6, 578 arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT, 579 ), 580 ), 581 id="unreach6_port", 582 ), 583 pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"), 584 # TOK_NAT 585 pytest.param( 586 ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42" 587 ), 588 pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"), 589 pytest.param( 590 ("skipto 42", InsnU32(IpFwOpcode.O_SKIPTO, u32=42)), id="skipto_42" 591 ), 592 pytest.param( 593 ("skipto 4200", InsnU32(IpFwOpcode.O_SKIPTO, u32=4200)), id="skipto_4200" 594 ), 595 pytest.param( 596 ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42" 597 ), 598 pytest.param( 599 ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42" 600 ), 601 pytest.param( 602 ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42" 603 ), 604 pytest.param( 605 ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd" 606 ), 607 pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"), 608 pytest.param( 609 ("call 420", InsnU32(IpFwOpcode.O_CALLRETURN, u32=420)), id="call_420" 610 ), 611 # TOK_FORWARD 612 pytest.param( 613 ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)), 614 id="setfib_1", 615 marks=pytest.mark.skip("needs net.fibs>1"), 616 ), 617 pytest.param( 618 ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)), 619 id="setdscp_42", 620 ), 621 pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"), 622 pytest.param( 623 ("return", InsnU32(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" 624 ), 625 ], 626 ) 627 def test_add_action(self, action): 628 """Tests if the rule action is compiled properly""" 629 rule_in = "add {} ip from any to any".format(action[0]) 630 rule_out = {"insns": [action[1]]} 631 self.verify_rule(rule_in, rule_out) 632 633 @pytest.mark.parametrize( 634 "insn", 635 [ 636 pytest.param( 637 { 638 "in": "add prob 0.7 allow ip from any to any", 639 "out": InsnProb(prob=0.7), 640 }, 641 id="test_prob", 642 ), 643 pytest.param( 644 { 645 "in": "add allow tcp from any to any", 646 "out": InsnProto(arg1=6), 647 }, 648 id="test_proto", 649 ), 650 pytest.param( 651 { 652 "in": "add allow ip from any to any 57", 653 "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), 654 }, 655 id="test_ports", 656 ), 657 ], 658 ) 659 def test_add_single_instruction(self, insn): 660 """Tests if the compiled rule is sane and matches the spec""" 661 662 # Prepare the desired output 663 out = { 664 "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], 665 } 666 self.verify_rule(insn["in"], out) 667 668 @pytest.mark.parametrize( 669 "opcode", 670 [ 671 pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), 672 pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), 673 ], 674 ) 675 @pytest.mark.parametrize( 676 "params", 677 [ 678 pytest.param( 679 { 680 "in": "57", 681 "out": [(57, 57)], 682 }, 683 id="test_single", 684 ), 685 pytest.param( 686 { 687 "in": "57-59", 688 "out": [(57, 59)], 689 }, 690 id="test_range", 691 ), 692 pytest.param( 693 { 694 "in": "57-59,41", 695 "out": [(57, 59), (41, 41)], 696 }, 697 id="test_ranges", 698 ), 699 ], 700 ) 701 def test_add_ports(self, params, opcode): 702 if opcode == IpFwOpcode.O_IP_DSTPORT: 703 txt = "add allow ip from any to any " + params["in"] 704 else: 705 txt = "add allow ip from any " + params["in"] + " to any" 706 out = { 707 "insns": [ 708 InsnPorts(opcode, port_pairs=params["out"]), 709 InsnEmpty(IpFwOpcode.O_ACCEPT), 710 ] 711 } 712 self.verify_rule(txt, out) 713