Source code for suricata_check.utils.rule

  1"""Module replacing idstools.rule and providing limited but similar functionality.
  2
  3This module is inspired by and mostly uses the same interface Python idstools package. (https://github.com/jasonish/py-idstools)
  4"""
  5
  6import inspect
  7from collections.abc import Hashable, Sequence
  8from dataclasses import dataclass
  9from typing import Optional
 10
 11from suricata_check.utils.regex_provider import get_regex_provider
 12
 13__all__ = ["ParsingError", "Rule", "RuleOption", "parse"]
 14
 15_regex_provider = get_regex_provider()
 16
 17RULE_PATTERN = _regex_provider.compile(
 18    r"^(?P<enabled>#)*[\s#]*(?P<raw>(?P<header>[^()]+)\((?P<options>.*)\)$)",
 19)
 20
 21RULE_ACTIONS = (
 22    "alert",
 23    "config",
 24    "log",
 25    "pass",
 26    "activate",
 27    "dynamic",
 28    "drop",
 29    "reject",
 30    "sdrop",
 31)
 32
 33
[docs] 34@dataclass 35class RuleOption: 36 """Class representing a rule option.""" 37 38 name: str 39 value: Optional[str] = None 40 41 def _to_dict(self: "RuleOption") -> dict[str, Hashable]: 42 """Returns the RuleOption represented as a dictionary.""" 43 return { 44 "name": self.name, 45 "value": self.value, 46 } 47
[docs] 48 def __hash__(self) -> int: 49 """Returns a unique hash that can be used as a fingerprint for the rule option.""" 50 return hash(tuple(sorted(self._to_dict().items())))
51 52
[docs] 53@dataclass 54class Rule: 55 """Class representing a rule.""" 56 57 raw: str 58 header: str 59 enabled: bool 60 action: str 61 proto: str 62 source_addr: str 63 source_port: str 64 direction: str 65 dest_addr: str 66 dest_port: str 67 options: tuple[RuleOption, ...] = () 68 metadata: tuple[str, ...] = () 69 flowbits: tuple[str, ...] = () 70 references: tuple[str, ...] = () 71 72 def __init__(self, *args: tuple, **kwargs: dict) -> None: 73 """Private Init function. 74 75 Use suricata_check.utils.rule.parse() instead to create Rule instances. 76 """ 77 if inspect.stack()[1].function != "parse": 78 raise RuntimeError( 79 "Rule instances must be created using suricata_check.utils.rule.parse()", 80 ) 81 super().__init__(*args, **kwargs) 82
[docs] 83 def __str__(self) -> str: 84 """Returns the raw rule as a string.""" 85 return self.raw
86
[docs] 87 def add_option(self, name: str, value: Optional[str]) -> None: 88 """Adds an option in the rule's options list.""" 89 if not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance] 90 raise TypeError("Option name must be a string") 91 if value is not None and not isinstance( 92 value, 93 str, 94 ): # pyright: ignore[reportUnnecessaryIsInstance] 95 raise TypeError("Option value must be a string") 96 97 self.options = (*self.options, RuleOption(name=name, value=value))
98
[docs] 99 def add_metadata_option(self, value: str) -> None: 100 """Adds metadata options in the rule's metadata list.""" 101 if not isinstance( 102 value, 103 str, 104 ): # pyright: ignore[reportUnnecessaryIsInstance] 105 raise TypeError("Metadata option value must be a string") 106 self.metadata = (*self.metadata, value)
107
[docs] 108 def add_metadata_options(self, values: Sequence[str]) -> None: 109 """Adds metadata options in the rule's metadata list.""" 110 for value in values: 111 if not isinstance( 112 value, 113 str, 114 ): # pyright: ignore[reportUnnecessaryIsInstance] 115 raise TypeError("Metadata option value must be a string") 116 self.metadata = (*self.metadata, *values)
117
[docs] 118 def add_flowbits_option(self, value: str) -> None: 119 """Adds a flowbits option in the rule's flowbits list.""" 120 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance] 121 raise TypeError("Flowbits option value must be a string") 122 self.flowbits = (*self.flowbits, value)
123
[docs] 124 def add_reference_option(self, value: str) -> None: 125 """Adds a reference option in the rule's references list.""" 126 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance] 127 raise TypeError("Reference option value must be a string") 128 self.references = (*self.references, value)
129 130 def _to_dict(self: "Rule") -> dict[str, Hashable]: 131 """Returns the Rule represented as a dictionary.""" 132 return { 133 "raw": self.raw, 134 "header": self.header, 135 "enabled": self.enabled, 136 "action": self.action, 137 "proto": self.proto, 138 "source_addr": self.source_addr, 139 "source_port": self.source_port, 140 "direction": self.direction, 141 "dest_addr": self.dest_addr, 142 "dest_port": self.dest_port, 143 "options": self.options, 144 "metadata": self.metadata, 145 "flowbits": self.flowbits, 146 } 147
[docs] 148 def __hash__(self) -> int: 149 """Returns a unique hash that can be used as a fingerprint for the rule.""" 150 return hash(tuple(sorted(self._to_dict().items())))
151 152
[docs] 153class ParsingError(RuntimeError): 154 """Raised when a rule cannot be parsed by suricata-check. 155 156 Most likely, such a rule is also an invalid Suricata rule. 157 """ 158 159 def __init__(self: "ParsingError", message: str) -> None: 160 """Initializes the `ParsingError` with the raw rule as message.""" 161 super().__init__(message)
162 163
[docs] 164def parse(buffer: str) -> Optional["Rule"]: 165 """Parse a rule stringand return a wrapped `Rule` instance. 166 167 Returns None when the text could not be parsed as a rule. 168 169 :param buffer: A string containing a single Suricata-like rule 170 171 :returns: An instance of of `Rule` representing the parsed rule 172 """ 173 text = buffer.strip() 174 175 m = RULE_PATTERN.match(text) 176 if not m: 177 return None 178 179 rule = Rule() 180 rule.raw = m.group("raw").strip() 181 rule.header = m.group("header").strip() 182 183 if m.group("enabled") == "#": 184 rule.enabled = False 185 else: 186 rule.enabled = True 187 188 header_vals = _regex_provider.split(r"\s+", rule.header) 189 # 7 is the number of expected header fields 190 if len(header_vals) != 7: # noqa: PLR2004 191 return None 192 ( 193 rule.action, 194 rule.proto, 195 rule.source_addr, 196 rule.source_port, 197 rule.direction, 198 rule.dest_addr, 199 rule.dest_port, 200 ) = header_vals 201 202 if rule.action not in RULE_ACTIONS: 203 return None 204 205 options = m.group("options") 206 207 __add_options_to_rule(rule, options) 208 209 return rule
210 211 212def __add_options_to_rule(rule: "Rule", options: str) -> None: 213 while True: 214 if not options: 215 break 216 index = __find_opt_end(options) 217 if index < 0: 218 raise ParsingError(f"end of option not found: {rule.raw}") 219 option = options[:index].strip() 220 options = options[index + 1 :].strip() 221 222 if option.find(":") > -1: 223 name, val = [x.strip() for x in option.split(":", 1)] 224 else: 225 name = option 226 val = None 227 228 __add_option_to_rule(rule, name, val) 229 230 231def __add_metadata_options_to_rule(rule: "Rule", options: str) -> None: 232 while True: 233 if not options: 234 break 235 index = __find_metadata_opt_end(options) 236 option = options[:index].strip() 237 options = options[index + 1 :].strip() 238 239 rule.add_metadata_option(option) 240 241 242def __add_option_to_rule( # noqa: C901, PLR0912 243 rule: "Rule", 244 name: str, 245 val: Optional[str], 246) -> None: 247 if val is not None and not isinstance( 248 val, 249 str, 250 ): # pyright: ignore[reportUnnecessaryIsInstance] 251 raise ParsingError(f"Invalid option value type ({type(val)}): {val}") 252 rule.add_option(name, val) 253 254 if name in ["gid", "sid", "rev"]: 255 try: 256 setattr(rule, name, int(val)) # pyright: ignore[reportArgumentType] 257 except ValueError: 258 raise ParsingError(f"Failed to convert {name} value to int: {val}") 259 elif name == "metadata": 260 if not isinstance(val, str): 261 raise ParsingError(f"Invalid metadata value type ({type(val)}): {val}") 262 __add_metadata_options_to_rule(rule, val) 263 elif name == "flowbits": 264 if not isinstance(val, str): 265 raise ParsingError(f"Invalid flowbits value type ({type(val)}): {val}") 266 rule.add_flowbits_option(val) 267 elif name == "reference": 268 if not isinstance(val, str): 269 raise ParsingError(f"Invalid reference value type ({type(val)}): {val}") 270 rule.add_reference_option(val) 271 elif name == "msg": 272 if not isinstance(val, str): 273 raise ParsingError(f"Invalid msg value type ({type(val)}): {val}") 274 if val.startswith('"') and val.endswith('"'): 275 val = val[1:-1] 276 setattr(rule, name, val) 277 else: 278 setattr(rule, name, val) 279 280 281def __find_opt_end(options: str) -> int: 282 """Find the end of an option (;) handling escapes.""" 283 offset = 0 284 285 while True: 286 i = options[offset:].find(";") 287 if options[offset + i - 1] == "\\": 288 offset += 2 289 else: 290 return offset + i 291 292 293def __find_metadata_opt_end(options: str) -> int: 294 """Find the end of a metadata option (,) handling quotes.""" 295 offset = 0 296 297 while True: 298 i = options[offset:].find(",") 299 if i == -1: 300 i = len(options) + 1 301 if options[0 : offset + i - 1].count('"') == 1: 302 offset += i + 1 303 else: 304 return offset + i