1"""Module replacing idstools.rule and providing limited but similar functionality.
2
3This module is inspired by and mostly uses the same interface Python idstools package. (https://github.com/jasonish/py-idstools)
4"""
5
6import inspect
7from collections.abc import Hashable, Sequence
8from dataclasses import dataclass
9from typing import Optional
10
11from suricata_check.utils.regex_provider import get_regex_provider
12
13__all__ = ["ParsingError", "Rule", "RuleOption", "parse"]
14
15_regex_provider = get_regex_provider()
16
17RULE_PATTERN = _regex_provider.compile(
18 r"^(?P<enabled>#)*[\s#]*(?P<raw>(?P<header>[^()]+)\((?P<options>.*)\)$)",
19)
20
21RULE_ACTIONS = (
22 "alert",
23 "config",
24 "log",
25 "pass",
26 "activate",
27 "dynamic",
28 "drop",
29 "reject",
30 "sdrop",
31)
32
33
[docs]
34@dataclass
35class RuleOption:
36 """Class representing a rule option."""
37
38 name: str
39 value: Optional[str] = None
40
41 def _to_dict(self: "RuleOption") -> dict[str, Hashable]:
42 """Returns the RuleOption represented as a dictionary."""
43 return {
44 "name": self.name,
45 "value": self.value,
46 }
47
[docs]
48 def __hash__(self) -> int:
49 """Returns a unique hash that can be used as a fingerprint for the rule option."""
50 return hash(tuple(sorted(self._to_dict().items())))
51
52
[docs]
53@dataclass
54class Rule:
55 """Class representing a rule."""
56
57 raw: str
58 header: str
59 enabled: bool
60 action: str
61 proto: str
62 source_addr: str
63 source_port: str
64 direction: str
65 dest_addr: str
66 dest_port: str
67 options: tuple[RuleOption, ...] = ()
68 metadata: tuple[str, ...] = ()
69 flowbits: tuple[str, ...] = ()
70 references: tuple[str, ...] = ()
71
72 def __init__(self, *args: tuple, **kwargs: dict) -> None:
73 """Private Init function.
74
75 Use suricata_check.utils.rule.parse() instead to create Rule instances.
76 """
77 if inspect.stack()[1].function != "parse":
78 raise RuntimeError(
79 "Rule instances must be created using suricata_check.utils.rule.parse()",
80 )
81 super().__init__(*args, **kwargs)
82
[docs]
83 def __str__(self) -> str:
84 """Returns the raw rule as a string."""
85 return self.raw
86
[docs]
87 def add_option(self, name: str, value: Optional[str]) -> None:
88 """Adds an option in the rule's options list."""
89 if not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance]
90 raise TypeError("Option name must be a string")
91 if value is not None and not isinstance(
92 value,
93 str,
94 ): # pyright: ignore[reportUnnecessaryIsInstance]
95 raise TypeError("Option value must be a string")
96
97 self.options = (*self.options, RuleOption(name=name, value=value))
98
107
117
[docs]
118 def add_flowbits_option(self, value: str) -> None:
119 """Adds a flowbits option in the rule's flowbits list."""
120 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance]
121 raise TypeError("Flowbits option value must be a string")
122 self.flowbits = (*self.flowbits, value)
123
[docs]
124 def add_reference_option(self, value: str) -> None:
125 """Adds a reference option in the rule's references list."""
126 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance]
127 raise TypeError("Reference option value must be a string")
128 self.references = (*self.references, value)
129
130 def _to_dict(self: "Rule") -> dict[str, Hashable]:
131 """Returns the Rule represented as a dictionary."""
132 return {
133 "raw": self.raw,
134 "header": self.header,
135 "enabled": self.enabled,
136 "action": self.action,
137 "proto": self.proto,
138 "source_addr": self.source_addr,
139 "source_port": self.source_port,
140 "direction": self.direction,
141 "dest_addr": self.dest_addr,
142 "dest_port": self.dest_port,
143 "options": self.options,
144 "metadata": self.metadata,
145 "flowbits": self.flowbits,
146 }
147
[docs]
148 def __hash__(self) -> int:
149 """Returns a unique hash that can be used as a fingerprint for the rule."""
150 return hash(tuple(sorted(self._to_dict().items())))
151
152
[docs]
153class ParsingError(RuntimeError):
154 """Raised when a rule cannot be parsed by suricata-check.
155
156 Most likely, such a rule is also an invalid Suricata rule.
157 """
158
159 def __init__(self: "ParsingError", message: str) -> None:
160 """Initializes the `ParsingError` with the raw rule as message."""
161 super().__init__(message)
162
163
[docs]
164def parse(buffer: str) -> Optional["Rule"]:
165 """Parse a rule stringand return a wrapped `Rule` instance.
166
167 Returns None when the text could not be parsed as a rule.
168
169 :param buffer: A string containing a single Suricata-like rule
170
171 :returns: An instance of of `Rule` representing the parsed rule
172 """
173 text = buffer.strip()
174
175 m = RULE_PATTERN.match(text)
176 if not m:
177 return None
178
179 rule = Rule()
180 rule.raw = m.group("raw").strip()
181 rule.header = m.group("header").strip()
182
183 if m.group("enabled") == "#":
184 rule.enabled = False
185 else:
186 rule.enabled = True
187
188 header_vals = _regex_provider.split(r"\s+", rule.header)
189 # 7 is the number of expected header fields
190 if len(header_vals) != 7: # noqa: PLR2004
191 return None
192 (
193 rule.action,
194 rule.proto,
195 rule.source_addr,
196 rule.source_port,
197 rule.direction,
198 rule.dest_addr,
199 rule.dest_port,
200 ) = header_vals
201
202 if rule.action not in RULE_ACTIONS:
203 return None
204
205 options = m.group("options")
206
207 __add_options_to_rule(rule, options)
208
209 return rule
210
211
212def __add_options_to_rule(rule: "Rule", options: str) -> None:
213 while True:
214 if not options:
215 break
216 index = __find_opt_end(options)
217 if index < 0:
218 raise ParsingError(f"end of option not found: {rule.raw}")
219 option = options[:index].strip()
220 options = options[index + 1 :].strip()
221
222 if option.find(":") > -1:
223 name, val = [x.strip() for x in option.split(":", 1)]
224 else:
225 name = option
226 val = None
227
228 __add_option_to_rule(rule, name, val)
229
230
231def __add_metadata_options_to_rule(rule: "Rule", options: str) -> None:
232 while True:
233 if not options:
234 break
235 index = __find_metadata_opt_end(options)
236 option = options[:index].strip()
237 options = options[index + 1 :].strip()
238
239 rule.add_metadata_option(option)
240
241
242def __add_option_to_rule( # noqa: C901, PLR0912
243 rule: "Rule",
244 name: str,
245 val: Optional[str],
246) -> None:
247 if val is not None and not isinstance(
248 val,
249 str,
250 ): # pyright: ignore[reportUnnecessaryIsInstance]
251 raise ParsingError(f"Invalid option value type ({type(val)}): {val}")
252 rule.add_option(name, val)
253
254 if name in ["gid", "sid", "rev"]:
255 try:
256 setattr(rule, name, int(val)) # pyright: ignore[reportArgumentType]
257 except ValueError:
258 raise ParsingError(f"Failed to convert {name} value to int: {val}")
259 elif name == "metadata":
260 if not isinstance(val, str):
261 raise ParsingError(f"Invalid metadata value type ({type(val)}): {val}")
262 __add_metadata_options_to_rule(rule, val)
263 elif name == "flowbits":
264 if not isinstance(val, str):
265 raise ParsingError(f"Invalid flowbits value type ({type(val)}): {val}")
266 rule.add_flowbits_option(val)
267 elif name == "reference":
268 if not isinstance(val, str):
269 raise ParsingError(f"Invalid reference value type ({type(val)}): {val}")
270 rule.add_reference_option(val)
271 elif name == "msg":
272 if not isinstance(val, str):
273 raise ParsingError(f"Invalid msg value type ({type(val)}): {val}")
274 if val.startswith('"') and val.endswith('"'):
275 val = val[1:-1]
276 setattr(rule, name, val)
277 else:
278 setattr(rule, name, val)
279
280
281def __find_opt_end(options: str) -> int:
282 """Find the end of an option (;) handling escapes."""
283 offset = 0
284
285 while True:
286 i = options[offset:].find(";")
287 if options[offset + i - 1] == "\\":
288 offset += 2
289 else:
290 return offset + i
291
292
293def __find_metadata_opt_end(options: str) -> int:
294 """Find the end of a metadata option (,) handling quotes."""
295 offset = 0
296
297 while True:
298 i = options[offset:].find(",")
299 if i == -1:
300 i = len(options) + 1
301 if options[0 : offset + i - 1].count('"') == 1:
302 offset += i + 1
303 else:
304 return offset + i