Source code for suricata_check.utils.rule

  1"""Module replacing idstools.rule and providing limited but similar functionality.
  2
  3This module is inspired by and mostly uses the same interface Python idstools package. (https://github.com/jasonish/py-idstools)
  4"""
  5
  6import inspect
  7from collections.abc import Hashable, Sequence
  8from dataclasses import dataclass
  9from typing import Optional
 10
 11from suricata_check.utils.regex_provider import get_regex_provider
 12
 13__all__ = ["ParsingError", "Rule", "RuleOption", "parse"]
 14
 15_regex_provider = get_regex_provider()
 16
 17RULE_PATTERN = _regex_provider.compile(
 18    r"^(?P<enabled>#)*[\s#]*(?P<raw>(?P<header>[^()]+)\((?P<options>.*)\)$)",
 19)
 20
 21RULE_ACTIONS = (
 22    "alert",
 23    "config",
 24    "log",
 25    "pass",
 26    "activate",
 27    "dynamic",
 28    "drop",
 29    "reject",
 30    "sdrop",
 31)
 32
 33
[docs] 34@dataclass 35class RuleOption: 36 """Class representing a rule option.""" 37 38 name: str 39 value: Optional[str] = None 40 41 def _to_dict(self: "RuleOption") -> dict[str, Hashable]: 42 """Returns the RuleOption represented as a dictionary.""" 43 return { 44 "name": self.name, 45 "value": self.value, 46 } 47
[docs] 48 def __hash__(self) -> int: 49 """Returns a unique hash that can be used as a fingerprint for the rule option.""" 50 return hash(tuple(sorted(self._to_dict().items())))
51 52
[docs] 53@dataclass 54class Rule: 55 """Class representing a rule.""" 56 57 raw: str 58 header: str 59 enabled: bool 60 action: str 61 proto: str 62 source_addr: str 63 source_port: str 64 direction: str 65 dest_addr: str 66 dest_port: str 67 options: tuple[RuleOption, ...] = () 68 metadata: tuple[str, ...] = () 69 flowbits: tuple[str, ...] = () 70 references: tuple[str, ...] = () 71 72 def __init__(self, *args: tuple, **kwargs: dict) -> None: 73 """Private Init function. 74 75 Use suricata_check.utils.rule.parse() instead to create Rule instances. 76 """ 77 if inspect.stack()[1].function != "parse": 78 raise RuntimeError( 79 "Rule instances must be created using suricata_check.utils.rule.parse()", 80 ) 81 super().__init__(*args, **kwargs) 82
[docs] 83 def __str__(self) -> str: 84 """Returns the raw rule as a string.""" 85 return self.raw
86
[docs] 87 def add_option(self, name: str, value: Optional[str]) -> None: 88 """Adds an option in the rule's options list.""" 89 if not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance] 90 raise TypeError("Option name must be a string") 91 if value is not None and not isinstance( 92 value, 93 str, 94 ): # pyright: ignore[reportUnnecessaryIsInstance] 95 raise TypeError("Option value must be a string") 96 97 self.options = (*self.options, RuleOption(name=name, value=value))
98
[docs] 99 def add_metadata_options(self, values: Sequence[str]) -> None: 100 """Adds metadata options in the rule's metadata list.""" 101 for value in values: 102 if not isinstance( 103 value, 104 str, 105 ): # pyright: ignore[reportUnnecessaryIsInstance] 106 raise TypeError("Metadata option value must be a string") 107 self.metadata = (*self.metadata, *values)
108
[docs] 109 def add_flowbits_option(self, value: str) -> None: 110 """Adds a flowbits option in the rule's flowbits list.""" 111 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance] 112 raise TypeError("Flowbits option value must be a string") 113 self.flowbits = (*self.flowbits, value)
114
[docs] 115 def add_reference_option(self, value: str) -> None: 116 """Adds a reference option in the rule's references list.""" 117 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance] 118 raise TypeError("Reference option value must be a string") 119 self.references = (*self.references, value)
120 121 def _to_dict(self: "Rule") -> dict[str, Hashable]: 122 """Returns the Rule represented as a dictionary.""" 123 return { 124 "raw": self.raw, 125 "header": self.header, 126 "enabled": self.enabled, 127 "action": self.action, 128 "proto": self.proto, 129 "source_addr": self.source_addr, 130 "source_port": self.source_port, 131 "direction": self.direction, 132 "dest_addr": self.dest_addr, 133 "dest_port": self.dest_port, 134 "options": self.options, 135 "metadata": self.metadata, 136 "flowbits": self.flowbits, 137 } 138
[docs] 139 def __hash__(self) -> int: 140 """Returns a unique hash that can be used as a fingerprint for the rule.""" 141 return hash(tuple(sorted(self._to_dict().items())))
142 143
[docs] 144class ParsingError(RuntimeError): 145 """Raised when a rule cannot be parsed by suricata-check. 146 147 Most likely, such a rule is also an invalid Suricata rule. 148 """ 149 150 def __init__(self: "ParsingError", message: str) -> None: 151 """Initializes the `ParsingError` with the raw rule as message.""" 152 super().__init__(message)
153 154
[docs] 155def parse(buffer: str) -> Optional["Rule"]: 156 """Parse a rule stringand return a wrapped `Rule` instance. 157 158 Returns None when the text could not be parsed as a rule. 159 160 :param buffer: A string containing a single Suricata-like rule 161 162 :returns: An instance of of `Rule` representing the parsed rule 163 """ 164 text = buffer.strip() 165 166 m = RULE_PATTERN.match(text) 167 if not m: 168 return None 169 170 rule = Rule() 171 rule.raw = m.group("raw").strip() 172 rule.header = m.group("header").strip() 173 174 if m.group("enabled") == "#": 175 rule.enabled = False 176 else: 177 rule.enabled = True 178 179 header_vals = _regex_provider.split(r"\s+", rule.header) 180 # 7 is the number of expected header fields 181 if len(header_vals) != 7: # noqa: PLR2004 182 return None 183 ( 184 rule.action, 185 rule.proto, 186 rule.source_addr, 187 rule.source_port, 188 rule.direction, 189 rule.dest_addr, 190 rule.dest_port, 191 ) = header_vals 192 193 if rule.action not in RULE_ACTIONS: 194 return None 195 196 options = m.group("options") 197 198 __add_options_to_rule(text, rule, options) 199 200 return rule
201 202 203def __add_options_to_rule(rule_text: str, rule: "Rule", options: str) -> None: 204 while True: 205 if not options: 206 break 207 index = __find_opt_end(options) 208 if index < 0: 209 raise ParsingError(f"end of option not found: {rule_text}") 210 option = options[:index].strip() 211 options = options[index + 1 :].strip() 212 213 if option.find(":") > -1: 214 name, val = [x.strip() for x in option.split(":", 1)] 215 else: 216 name = option 217 val = None 218 219 __add_option_to_rule(rule, name, val) 220 221 222def __add_option_to_rule( # noqa: C901, PLR0912 223 rule: "Rule", 224 name: str, 225 val: Optional[str], 226) -> None: 227 if val is not None and not isinstance( 228 val, 229 str, 230 ): # pyright: ignore[reportUnnecessaryIsInstance] 231 raise ParsingError(f"Invalid option value type ({type(val)}): {val}") 232 rule.add_option(name, val) 233 234 if name in ["gid", "sid", "rev"]: 235 try: 236 setattr(rule, name, int(val)) # pyright: ignore[reportArgumentType] 237 except ValueError: 238 raise ParsingError(f"Failed to convert {name} value to int: {val}") 239 elif name == "metadata": 240 if not isinstance(val, str): 241 raise ParsingError(f"Invalid metadata value type ({type(val)}): {val}") 242 rule.add_metadata_options([v.strip() for v in val.split(",")]) 243 elif name == "flowbits": 244 if not isinstance(val, str): 245 raise ParsingError(f"Invalid flowbits value type ({type(val)}): {val}") 246 rule.add_flowbits_option(val) 247 elif name == "reference": 248 if not isinstance(val, str): 249 raise ParsingError(f"Invalid reference value type ({type(val)}): {val}") 250 rule.add_reference_option(val) 251 elif name == "msg": 252 if not isinstance(val, str): 253 raise ParsingError(f"Invalid msg value type ({type(val)}): {val}") 254 if val.startswith('"') and val.endswith('"'): 255 val = val[1:-1] 256 setattr(rule, name, val) 257 else: 258 setattr(rule, name, val) 259 260 261def __find_opt_end(options: str) -> int: 262 """Find the end of an option (;) handling escapes.""" 263 offset = 0 264 265 while True: 266 i = options[offset:].find(";") 267 if options[offset + i - 1] == "\\": 268 offset += 2 269 else: 270 return offset + i