1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0-only 3# 4# Implementation based on 5# Gerth, R., Peled, D., Vardi, M.Y., Wolper, P. (1996). 6# Simple On-the-fly Automatic Verification of Linear Temporal Logic. 7# https://doi.org/10.1007/978-0-387-34892-6_1 8# With extra optimizations 9 10from ply.lex import lex 11from ply.yacc import yacc 12 13# Grammar: 14# ltl ::= opd | ( ltl ) | ltl binop ltl | unop ltl 15# 16# Operands (opd): 17# true, false, user-defined names 18# 19# Unary Operators (unop): 20# always 21# eventually 22# next 23# not 24# 25# Binary Operators (binop): 26# until 27# and 28# or 29# imply 30# equivalent 31 32tokens = ( 33 'AND', 34 'OR', 35 'IMPLY', 36 'UNTIL', 37 'ALWAYS', 38 'EVENTUALLY', 39 'NEXT', 40 'VARIABLE', 41 'LITERAL', 42 'NOT', 43 'LPAREN', 44 'RPAREN', 45 'ASSIGN', 46) 47 48t_AND = r'and' 49t_OR = r'or' 50t_IMPLY = r'imply' 51t_UNTIL = r'until' 52t_ALWAYS = r'always' 53t_NEXT = r'next' 54t_EVENTUALLY = r'eventually' 55t_VARIABLE = r'[A-Z_0-9]+' 56t_LITERAL = r'true|false' 57t_NOT = r'not' 58t_LPAREN = r'\(' 59t_RPAREN = r'\)' 60t_ASSIGN = r'=' 61t_ignore_COMMENT = r'\#.*' 62t_ignore = ' \t\n' 63 64def t_error(t): 65 raise ValueError(f"Illegal character '{t.value[0]}'") 66 67lexer = lex() 68 69class GraphNode: 70 uid = 0 71 72 def __init__(self, incoming: set['GraphNode'], new, old, _next): 73 self.init = False 74 self.outgoing = set() 75 self.labels = set() 76 self.incoming = incoming.copy() 77 self.new = new.copy() 78 self.old = old.copy() 79 self.next = _next.copy() 80 self.id = GraphNode.uid 81 GraphNode.uid += 1 82 83 def expand(self, node_set): 84 if not self.new: 85 for nd in node_set: 86 if nd.old == self.old and nd.next == self.next: 87 nd.incoming |= self.incoming 88 return node_set 89 90 new_current_node = GraphNode({self}, self.next, set(), set()) 91 return new_current_node.expand({self} | node_set) 92 n = self.new.pop() 93 return n.expand(self, node_set) 94 95 def __lt__(self, other): 96 return self.id < other.id 97 98class ASTNode: 99 uid = 1 100 101 def __init__(self, op): 102 self.op = op 103 self.id = ASTNode.uid 104 ASTNode.uid += 1 105 106 def __hash__(self): 107 return hash(self.op) 108 109 def __eq__(self, other): 110 return self is other 111 112 def __iter__(self): 113 yield self 114 yield from self.op 115 116 def negate(self): 117 self.op = self.op.negate() 118 return self 119 120 def expand(self, node, node_set): 121 return self.op.expand(self, node, node_set) 122 123 def __str__(self): 124 if isinstance(self.op, Literal): 125 return str(self.op.value) 126 if isinstance(self.op, Variable): 127 return self.op.name.lower() 128 return "val" + str(self.id) 129 130 def normalize(self): 131 # Get rid of: 132 # - ALWAYS 133 # - EVENTUALLY 134 # - IMPLY 135 # And move all the NOT to be inside 136 self.op = self.op.normalize() 137 return self 138 139class BinaryOp: 140 op_str = "not_supported" 141 142 def __init__(self, left: ASTNode, right: ASTNode): 143 self.left = left 144 self.right = right 145 146 def __hash__(self): 147 return hash((self.left, self.right)) 148 149 def __iter__(self): 150 yield from self.left 151 yield from self.right 152 153 def normalize(self): 154 raise NotImplementedError 155 156 def negate(self): 157 raise NotImplementedError 158 159 def _is_temporal(self): 160 raise NotImplementedError 161 162 def is_temporal(self): 163 if self.left.op.is_temporal(): 164 return True 165 if self.right.op.is_temporal(): 166 return True 167 return self._is_temporal() 168 169 @staticmethod 170 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 171 raise NotImplementedError 172 173class AndOp(BinaryOp): 174 op_str = '&&' 175 176 def normalize(self): 177 return self 178 179 def negate(self): 180 return OrOp(self.left.negate(), self.right.negate()) 181 182 def _is_temporal(self): 183 return False 184 185 @staticmethod 186 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 187 if not n.op.is_temporal(): 188 node.old.add(n) 189 return node.expand(node_set) 190 191 tmp = GraphNode(node.incoming, 192 node.new | ({n.op.left, n.op.right} - node.old), 193 node.old | {n}, 194 node.next) 195 return tmp.expand(node_set) 196 197class OrOp(BinaryOp): 198 op_str = '||' 199 200 def normalize(self): 201 return self 202 203 def negate(self): 204 return AndOp(self.left.negate(), self.right.negate()) 205 206 def _is_temporal(self): 207 return False 208 209 @staticmethod 210 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 211 if not n.op.is_temporal(): 212 node.old |= {n} 213 return node.expand(node_set) 214 215 node1 = GraphNode(node.incoming, 216 node.new | ({n.op.left} - node.old), 217 node.old | {n}, 218 node.next) 219 node2 = GraphNode(node.incoming, 220 node.new | ({n.op.right} - node.old), 221 node.old | {n}, 222 node.next) 223 return node2.expand(node1.expand(node_set)) 224 225class UntilOp(BinaryOp): 226 def normalize(self): 227 return self 228 229 def negate(self): 230 return VOp(self.left.negate(), self.right.negate()) 231 232 def _is_temporal(self): 233 return True 234 235 @staticmethod 236 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 237 node1 = GraphNode(node.incoming, 238 node.new | ({n.op.left} - node.old), 239 node.old | {n}, 240 node.next | {n}) 241 node2 = GraphNode(node.incoming, 242 node.new | ({n.op.right} - node.old), 243 node.old | {n}, 244 node.next) 245 return node2.expand(node1.expand(node_set)) 246 247class VOp(BinaryOp): 248 def normalize(self): 249 return self 250 251 def negate(self): 252 return UntilOp(self.left.negate(), self.right.negate()) 253 254 def _is_temporal(self): 255 return True 256 257 @staticmethod 258 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 259 node1 = GraphNode(node.incoming, 260 node.new | ({n.op.right} - node.old), 261 node.old | {n}, 262 node.next | {n}) 263 node2 = GraphNode(node.incoming, 264 node.new | ({n.op.left, n.op.right} - node.old), 265 node.old | {n}, 266 node.next) 267 return node2.expand(node1.expand(node_set)) 268 269class ImplyOp(BinaryOp): 270 def normalize(self): 271 # P -> Q === !P | Q 272 return OrOp(self.left.negate(), self.right) 273 274 def _is_temporal(self): 275 return False 276 277 def negate(self): 278 # !(P -> Q) === !(!P | Q) === P & !Q 279 return AndOp(self.left, self.right.negate()) 280 281class UnaryOp: 282 def __init__(self, child: ASTNode): 283 self.child = child 284 285 def __iter__(self): 286 yield from self.child 287 288 def __hash__(self): 289 return hash(self.child) 290 291 def normalize(self): 292 raise NotImplementedError 293 294 def _is_temporal(self): 295 raise NotImplementedError 296 297 def is_temporal(self): 298 if self.child.op.is_temporal(): 299 return True 300 return self._is_temporal() 301 302 def negate(self): 303 raise NotImplementedError 304 305class EventuallyOp(UnaryOp): 306 def __str__(self): 307 return "eventually " + str(self.child) 308 309 def normalize(self): 310 # <>F == true U F 311 return UntilOp(ASTNode(Literal(True)), self.child) 312 313 def _is_temporal(self): 314 return True 315 316 def negate(self): 317 # !<>F == [](!F) 318 return AlwaysOp(self.child.negate()).normalize() 319 320class AlwaysOp(UnaryOp): 321 def normalize(self): 322 # []F === !(true U !F) == false V F 323 new = ASTNode(Literal(False)) 324 return VOp(new, self.child) 325 326 def _is_temporal(self): 327 return True 328 329 def negate(self): 330 # ![]F == <>(!F) 331 return EventuallyOp(self.child.negate()).normalize() 332 333class NextOp(UnaryOp): 334 def normalize(self): 335 return self 336 337 def _is_temporal(self): 338 return True 339 340 def negate(self): 341 # not (next A) == next (not A) 342 self.child = self.child.negate() 343 return self 344 345 @staticmethod 346 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 347 tmp = GraphNode(node.incoming, 348 node.new, 349 node.old | {n}, 350 node.next | {n.op.child}) 351 return tmp.expand(node_set) 352 353class NotOp(UnaryOp): 354 def __str__(self): 355 return "!" + str(self.child) 356 357 def normalize(self): 358 return self.child.op.negate() 359 360 def negate(self): 361 return self.child.op 362 363 def _is_temporal(self): 364 return False 365 366 @staticmethod 367 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 368 for f in node.old: 369 if n.op.child is f: 370 return node_set 371 node.old |= {n} 372 return node.expand(node_set) 373 374class Variable: 375 def __init__(self, name: str): 376 self.name = name 377 378 def __hash__(self): 379 return hash(self.name) 380 381 def __iter__(self): 382 yield from () 383 384 def negate(self): 385 new = ASTNode(self) 386 return NotOp(new) 387 388 def normalize(self): 389 return self 390 391 def is_temporal(self): 392 return False 393 394 @staticmethod 395 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 396 for f in node.old: 397 if isinstance(f, NotOp) and f.op.child is n: 398 return node_set 399 node.old |= {n} 400 return node.expand(node_set) 401 402class Literal: 403 def __init__(self, value: bool): 404 self.value = value 405 406 def __iter__(self): 407 yield from () 408 409 def __hash__(self): 410 return hash(self.value) 411 412 def __str__(self): 413 if self.value: 414 return "true" 415 return "false" 416 417 def negate(self): 418 self.value = not self.value 419 return self 420 421 def normalize(self): 422 return self 423 424 def is_temporal(self): 425 return False 426 427 @staticmethod 428 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 429 if not n.op.value: 430 return node_set 431 node.old |= {n} 432 return node.expand(node_set) 433 434def p_spec(p): 435 ''' 436 spec : assign 437 | assign spec 438 ''' 439 if len(p) == 3: 440 p[2].append(p[1]) 441 p[0] = p[2] 442 else: 443 p[0] = [p[1]] 444 445def p_assign(p): 446 ''' 447 assign : VARIABLE ASSIGN ltl 448 ''' 449 p[0] = (p[1], p[3]) 450 451def p_ltl(p): 452 ''' 453 ltl : opd 454 | binop 455 | unop 456 ''' 457 p[0] = p[1] 458 459def p_opd(p): 460 ''' 461 opd : VARIABLE 462 | LITERAL 463 | LPAREN ltl RPAREN 464 ''' 465 if p[1] == "true": 466 p[0] = ASTNode(Literal(True)) 467 elif p[1] == "false": 468 p[0] = ASTNode(Literal(False)) 469 elif p[1] == '(': 470 p[0] = p[2] 471 else: 472 p[0] = ASTNode(Variable(p[1])) 473 474def p_unop(p): 475 ''' 476 unop : ALWAYS ltl 477 | EVENTUALLY ltl 478 | NEXT ltl 479 | NOT ltl 480 ''' 481 if p[1] == "always": 482 op = AlwaysOp(p[2]) 483 elif p[1] == "eventually": 484 op = EventuallyOp(p[2]) 485 elif p[1] == "next": 486 op = NextOp(p[2]) 487 elif p[1] == "not": 488 op = NotOp(p[2]) 489 else: 490 raise ValueError(f"Invalid unary operator {p[1]}") 491 492 p[0] = ASTNode(op) 493 494def p_binop(p): 495 ''' 496 binop : opd UNTIL ltl 497 | opd AND ltl 498 | opd OR ltl 499 | opd IMPLY ltl 500 ''' 501 if p[2] == "and": 502 op = AndOp(p[1], p[3]) 503 elif p[2] == "until": 504 op = UntilOp(p[1], p[3]) 505 elif p[2] == "or": 506 op = OrOp(p[1], p[3]) 507 elif p[2] == "imply": 508 op = ImplyOp(p[1], p[3]) 509 else: 510 raise ValueError(f"Invalid binary operator {p[2]}") 511 512 p[0] = ASTNode(op) 513 514parser = yacc() 515 516def parse_ltl(s: str) -> ASTNode: 517 spec = parser.parse(s) 518 519 rule = None 520 subexpr = {} 521 522 for assign in spec: 523 if assign[0] == "RULE": 524 rule = assign[1] 525 else: 526 subexpr[assign[0]] = assign[1] 527 528 if rule is None: 529 raise ValueError("Please define your specification in the \"RULE = <LTL spec>\" format") 530 531 for node in rule: 532 if not isinstance(node.op, Variable): 533 continue 534 replace = subexpr.get(node.op.name) 535 if replace is not None: 536 node.op = replace.op 537 538 return rule 539 540def create_graph(s: str): 541 atoms = set() 542 543 ltl = parse_ltl(s) 544 for c in ltl: 545 c.normalize() 546 if isinstance(c.op, Variable): 547 atoms.add(c.op.name) 548 549 init = GraphNode(set(), set(), set(), set()) 550 head = GraphNode({init}, {ltl}, set(), set()) 551 graph = sorted(head.expand(set())) 552 553 for i, node in enumerate(graph): 554 # The id assignment during graph generation has gaps. Reassign them 555 node.id = i 556 557 for incoming in node.incoming: 558 if incoming is init: 559 node.init = True 560 else: 561 incoming.outgoing.add(node) 562 for o in node.old: 563 if not o.op.is_temporal(): 564 node.labels.add(str(o)) 565 566 return sorted(atoms), graph, ltl 567