1"""Module replacing idstools.rule and providing limited but similar functionality.
2
3This module is inspired by and mostly uses the same interface Python idstools package. (https://github.com/jasonish/py-idstools)
4"""
5
6import inspect
7from collections.abc import Hashable, Sequence
8from dataclasses import dataclass
9from typing import Optional
10
11from suricata_check.utils.regex_provider import get_regex_provider
12
13__all__ = ["ParsingError", "Rule", "RuleOption", "parse"]
14
15_regex_provider = get_regex_provider()
16
17RULE_PATTERN = _regex_provider.compile(
18 r"^(?P<enabled>#)*[\s#]*(?P<raw>(?P<header>[^()]+)\((?P<options>.*)\)$)",
19)
20
21RULE_ACTIONS = (
22 "alert",
23 "config",
24 "log",
25 "pass",
26 "activate",
27 "dynamic",
28 "drop",
29 "reject",
30 "sdrop",
31)
32
33
[docs]
34@dataclass
35class RuleOption:
36 """Class representing a rule option."""
37
38 name: str
39 value: Optional[str] = None
40
41 def _to_dict(self: "RuleOption") -> dict[str, Hashable]:
42 """Returns the RuleOption represented as a dictionary."""
43 return {
44 "name": self.name,
45 "value": self.value,
46 }
47
[docs]
48 def __hash__(self) -> int:
49 """Returns a unique hash that can be used as a fingerprint for the rule option."""
50 return hash(tuple(sorted(self._to_dict().items())))
51
52
[docs]
53@dataclass
54class Rule:
55 """Class representing a rule."""
56
57 raw: str
58 header: str
59 enabled: bool
60 action: str
61 proto: str
62 source_addr: str
63 source_port: str
64 direction: str
65 dest_addr: str
66 dest_port: str
67 options: tuple[RuleOption, ...] = ()
68 metadata: tuple[str, ...] = ()
69 flowbits: tuple[str, ...] = ()
70 references: tuple[str, ...] = ()
71
72 def __init__(self, *args: tuple, **kwargs: dict) -> None:
73 """Private Init function.
74
75 Use suricata_check.utils.rule.parse() instead to create Rule instances.
76 """
77 if inspect.stack()[1].function != "parse":
78 raise RuntimeError(
79 "Rule instances must be created using suricata_check.utils.rule.parse()",
80 )
81 super().__init__(*args, **kwargs)
82
[docs]
83 def __str__(self) -> str:
84 """Returns the raw rule as a string."""
85 return self.raw
86
[docs]
87 def add_option(self, name: str, value: Optional[str]) -> None:
88 """Adds an option in the rule's options list."""
89 if not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance]
90 raise TypeError("Option name must be a string")
91 if value is not None and not isinstance(
92 value,
93 str,
94 ): # pyright: ignore[reportUnnecessaryIsInstance]
95 raise TypeError("Option value must be a string")
96
97 self.options = (*self.options, RuleOption(name=name, value=value))
98
108
[docs]
109 def add_flowbits_option(self, value: str) -> None:
110 """Adds a flowbits option in the rule's flowbits list."""
111 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance]
112 raise TypeError("Flowbits option value must be a string")
113 self.flowbits = (*self.flowbits, value)
114
[docs]
115 def add_reference_option(self, value: str) -> None:
116 """Adds a reference option in the rule's references list."""
117 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance]
118 raise TypeError("Reference option value must be a string")
119 self.references = (*self.references, value)
120
121 def _to_dict(self: "Rule") -> dict[str, Hashable]:
122 """Returns the Rule represented as a dictionary."""
123 return {
124 "raw": self.raw,
125 "header": self.header,
126 "enabled": self.enabled,
127 "action": self.action,
128 "proto": self.proto,
129 "source_addr": self.source_addr,
130 "source_port": self.source_port,
131 "direction": self.direction,
132 "dest_addr": self.dest_addr,
133 "dest_port": self.dest_port,
134 "options": self.options,
135 "metadata": self.metadata,
136 "flowbits": self.flowbits,
137 }
138
[docs]
139 def __hash__(self) -> int:
140 """Returns a unique hash that can be used as a fingerprint for the rule."""
141 return hash(tuple(sorted(self._to_dict().items())))
142
143
[docs]
144class ParsingError(RuntimeError):
145 """Raised when a rule cannot be parsed by suricata-check.
146
147 Most likely, such a rule is also an invalid Suricata rule.
148 """
149
150 def __init__(self: "ParsingError", message: str) -> None:
151 """Initializes the `ParsingError` with the raw rule as message."""
152 super().__init__(message)
153
154
[docs]
155def parse(buffer: str) -> Optional["Rule"]:
156 """Parse a rule stringand return a wrapped `Rule` instance.
157
158 Returns None when the text could not be parsed as a rule.
159
160 :param buffer: A string containing a single Suricata-like rule
161
162 :returns: An instance of of `Rule` representing the parsed rule
163 """
164 text = buffer.strip()
165
166 m = RULE_PATTERN.match(text)
167 if not m:
168 return None
169
170 rule = Rule()
171 rule.raw = m.group("raw").strip()
172 rule.header = m.group("header").strip()
173
174 if m.group("enabled") == "#":
175 rule.enabled = False
176 else:
177 rule.enabled = True
178
179 header_vals = _regex_provider.split(r"\s+", rule.header)
180 # 7 is the number of expected header fields
181 if len(header_vals) != 7: # noqa: PLR2004
182 return None
183 (
184 rule.action,
185 rule.proto,
186 rule.source_addr,
187 rule.source_port,
188 rule.direction,
189 rule.dest_addr,
190 rule.dest_port,
191 ) = header_vals
192
193 if rule.action not in RULE_ACTIONS:
194 return None
195
196 options = m.group("options")
197
198 __add_options_to_rule(text, rule, options)
199
200 return rule
201
202
203def __add_options_to_rule(rule_text: str, rule: "Rule", options: str) -> None:
204 while True:
205 if not options:
206 break
207 index = __find_opt_end(options)
208 if index < 0:
209 raise ParsingError(f"end of option not found: {rule_text}")
210 option = options[:index].strip()
211 options = options[index + 1 :].strip()
212
213 if option.find(":") > -1:
214 name, val = [x.strip() for x in option.split(":", 1)]
215 else:
216 name = option
217 val = None
218
219 __add_option_to_rule(rule, name, val)
220
221
222def __add_option_to_rule( # noqa: C901, PLR0912
223 rule: "Rule",
224 name: str,
225 val: Optional[str],
226) -> None:
227 if val is not None and not isinstance(
228 val,
229 str,
230 ): # pyright: ignore[reportUnnecessaryIsInstance]
231 raise ParsingError(f"Invalid option value type ({type(val)}): {val}")
232 rule.add_option(name, val)
233
234 if name in ["gid", "sid", "rev"]:
235 try:
236 setattr(rule, name, int(val)) # pyright: ignore[reportArgumentType]
237 except ValueError:
238 raise ParsingError(f"Failed to convert {name} value to int: {val}")
239 elif name == "metadata":
240 if not isinstance(val, str):
241 raise ParsingError(f"Invalid metadata value type ({type(val)}): {val}")
242 rule.add_metadata_options([v.strip() for v in val.split(",")])
243 elif name == "flowbits":
244 if not isinstance(val, str):
245 raise ParsingError(f"Invalid flowbits value type ({type(val)}): {val}")
246 rule.add_flowbits_option(val)
247 elif name == "reference":
248 if not isinstance(val, str):
249 raise ParsingError(f"Invalid reference value type ({type(val)}): {val}")
250 rule.add_reference_option(val)
251 elif name == "msg":
252 if not isinstance(val, str):
253 raise ParsingError(f"Invalid msg value type ({type(val)}): {val}")
254 if val.startswith('"') and val.endswith('"'):
255 val = val[1:-1]
256 setattr(rule, name, val)
257 else:
258 setattr(rule, name, val)
259
260
261def __find_opt_end(options: str) -> int:
262 """Find the end of an option (;) handling escapes."""
263 offset = 0
264
265 while True:
266 i = options[offset:].find(";")
267 if options[offset + i - 1] == "\\":
268 offset += 2
269 else:
270 return offset + i