Source code for suricata_check.utils.rule

  1"""Module replacing idstools.rule and providing limited but similar functionality.
  2
  3This module is inspired by and mostly uses the same interface Python idstools package. (https://github.com/jasonish/py-idstools)
  4"""
  5
  6import inspect
  7from collections.abc import Hashable, Sequence
  8from dataclasses import dataclass
  9from typing import Optional
 10
 11from suricata_check.utils.regex_provider import get_regex_provider
 12
 13_regex_provider = get_regex_provider()
 14
 15RULE_PATTERN = _regex_provider.compile(
 16    r"^(?P<enabled>#)*[\s#]*(?P<raw>(?P<header>[^()]+)\((?P<options>.*)\)$)"
 17)
 18
 19RULE_ACTIONS = (
 20    "alert",
 21    "config",
 22    "log",
 23    "pass",
 24    "activate",
 25    "dynamic",
 26    "drop",
 27    "reject",
 28    "sdrop",
 29)
 30
 31
[docs] 32@dataclass 33class RuleOption: 34 """Class representing a rule option.""" 35 36 name: str 37 value: Optional[str] = None 38 39 def _to_dict(self: "RuleOption") -> dict[str, Hashable]: 40 """Returns the RuleOption represented as a dictionary.""" 41 return { 42 "name": self.name, 43 "value": self.value, 44 } 45
[docs] 46 def __hash__(self) -> int: 47 """Returns a unique hash that can be used as a fingerprint for the rule option.""" 48 return hash(tuple(sorted(self._to_dict().items())))
49 50
[docs] 51@dataclass 52class Rule: 53 """Class representing a rule.""" 54 55 raw: str 56 header: str 57 enabled: bool 58 action: str 59 proto: str 60 source_addr: str 61 source_port: str 62 direction: str 63 dest_addr: str 64 dest_port: str 65 options: tuple[RuleOption, ...] = () 66 metadata: tuple[str, ...] = () 67 flowbits: tuple[str, ...] = () 68 references: tuple[str, ...] = () 69 70 def __init__(self, *args: tuple, **kwargs: dict) -> None: 71 """Private Init function. 72 73 Use suricata_check.utils.rule.parse() instead to create Rule instances. 74 """ 75 if inspect.stack()[1].function != "parse": 76 raise RuntimeError( 77 "Rule instances must be created using suricata_check.utils.rule.parse()" 78 ) 79 super().__init__(*args, **kwargs) 80
[docs] 81 def add_option(self, name: str, value: Optional[str]) -> None: 82 """Adds an option in the rule's options list.""" 83 if not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance] 84 raise TypeError("Option name must be a string") 85 if value is not None and not isinstance( 86 value, str 87 ): # pyright: ignore[reportUnnecessaryIsInstance] 88 raise TypeError("Option value must be a string") 89 90 self.options = (*self.options, RuleOption(name=name, value=value))
91
[docs] 92 def add_metadata_options(self, values: Sequence[str]) -> None: 93 """Adds metadata options in the rule's metadata list.""" 94 for value in values: 95 if not isinstance( 96 value, str 97 ): # pyright: ignore[reportUnnecessaryIsInstance] 98 raise TypeError("Metadata option value must be a string") 99 self.metadata = (*self.metadata, *values)
100
[docs] 101 def add_flowbits_option(self, value: str) -> None: 102 """Adds a flowbits option in the rule's flowbits list.""" 103 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance] 104 raise TypeError("Flowbits option value must be a string") 105 self.flowbits = (*self.flowbits, value)
106
[docs] 107 def add_reference_option(self, value: str) -> None: 108 """Adds a reference option in the rule's references list.""" 109 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance] 110 raise TypeError("Reference option value must be a string") 111 self.references = (*self.references, value)
112 113 def _to_dict(self: "Rule") -> dict[str, Hashable]: 114 """Returns the Rule represented as a dictionary.""" 115 return { 116 "raw": self.raw, 117 "header": self.header, 118 "enabled": self.enabled, 119 "action": self.action, 120 "proto": self.proto, 121 "source_addr": self.source_addr, 122 "source_port": self.source_port, 123 "direction": self.direction, 124 "dest_addr": self.dest_addr, 125 "dest_port": self.dest_port, 126 "options": self.options, 127 "metadata": self.metadata, 128 "flowbits": self.flowbits, 129 } 130
[docs] 131 def __hash__(self) -> int: 132 """Returns a unique hash that can be used as a fingerprint for the rule.""" 133 return hash(tuple(sorted(self._to_dict().items())))
134 135
[docs] 136class ParsingError(RuntimeError): 137 """Raised when a rule cannot be parsed by suricata-check. 138 139 Most likely, such a rule is also an invalid Suricata rule. 140 """ 141 142 def __init__(self: "ParsingError", message: str) -> None: 143 """Initializes the `ParsingError` with the raw rule as message.""" 144 super().__init__(message)
145 146
[docs] 147def parse(buffer: str) -> Optional["Rule"]: 148 """Parse a rule stringand return a wrapped `Rule` instance. 149 150 Returns None when the text could not be parsed as a rule. 151 152 :param buffer: A string containing a single Suricata-like rule 153 154 :returns: An instance of of `Rule` representing the parsed rule 155 """ 156 text = buffer.strip() 157 158 m = RULE_PATTERN.match(text) 159 if not m: 160 return None 161 162 rule = Rule() 163 rule.raw = m.group("raw").strip() 164 rule.header = m.group("header").strip() 165 166 if m.group("enabled") == "#": 167 rule.enabled = False 168 else: 169 rule.enabled = True 170 171 header_vals = _regex_provider.split(r"\s+", rule.header) 172 # 7 is the number of expected header fields 173 if len(header_vals) != 7: # noqa: PLR2004 174 return None 175 ( 176 rule.action, 177 rule.proto, 178 rule.source_addr, 179 rule.source_port, 180 rule.direction, 181 rule.dest_addr, 182 rule.dest_port, 183 ) = header_vals 184 185 if rule.action not in RULE_ACTIONS: 186 return None 187 188 options = m.group("options") 189 190 __add_options_to_rule(text, rule, options) 191 192 return rule
193 194 195def __add_options_to_rule(rule_text: str, rule: "Rule", options: str) -> None: 196 while True: 197 if not options: 198 break 199 index = __find_opt_end(options) 200 if index < 0: 201 raise ParsingError("end of option not found: {}".format(rule_text)) 202 option = options[:index].strip() 203 options = options[index + 1 :].strip() 204 205 if option.find(":") > -1: 206 name, val = [x.strip() for x in option.split(":", 1)] 207 else: 208 name = option 209 val = None 210 211 __add_option_to_rule(rule, name, val) 212 213 214def __add_option_to_rule( # noqa: C901, PLR0912 215 rule: "Rule", name: str, val: Optional[str] 216) -> None: 217 if val is not None and not isinstance( 218 val, str 219 ): # pyright: ignore[reportUnnecessaryIsInstance] 220 raise ParsingError(f"Invalid option value type ({type(val)}): {val}") 221 rule.add_option(name, val) 222 223 if name in ["gid", "sid", "rev"]: 224 try: 225 setattr(rule, name, int(val)) # pyright: ignore[reportArgumentType] 226 except ValueError: 227 raise ParsingError(f"Failed to convert {name} value to int: {val}") 228 elif name == "metadata": 229 if not isinstance(val, str): 230 raise ParsingError(f"Invalid metadata value type ({type(val)}): {val}") 231 rule.add_metadata_options([v.strip() for v in val.split(",")]) 232 elif name == "flowbits": 233 if not isinstance(val, str): 234 raise ParsingError(f"Invalid flowbits value type ({type(val)}): {val}") 235 rule.add_flowbits_option(val) 236 elif name == "reference": 237 if not isinstance(val, str): 238 raise ParsingError(f"Invalid reference value type ({type(val)}): {val}") 239 rule.add_reference_option(val) 240 elif name == "msg": 241 if not isinstance(val, str): 242 raise ParsingError(f"Invalid msg value type ({type(val)}): {val}") 243 if val.startswith('"') and val.endswith('"'): 244 val = val[1:-1] 245 setattr(rule, name, val) 246 else: 247 setattr(rule, name, val) 248 249 250def __find_opt_end(options: str) -> int: 251 """Find the end of an option (;) handling escapes.""" 252 offset = 0 253 254 while True: 255 i = options[offset:].find(";") 256 if options[offset + i - 1] == "\\": 257 offset += 2 258 else: 259 return offset + i