1"""Module replacing idstools.rule and providing limited but similar functionality.
2
3This module is inspired by and mostly uses the same interface Python idstools package. (https://github.com/jasonish/py-idstools)
4"""
5
6import inspect
7from collections.abc import Hashable, Sequence
8from dataclasses import dataclass
9from typing import Optional
10
11from suricata_check.utils.regex_provider import get_regex_provider
12
13_regex_provider = get_regex_provider()
14
15RULE_PATTERN = _regex_provider.compile(
16 r"^(?P<enabled>#)*[\s#]*(?P<raw>(?P<header>[^()]+)\((?P<options>.*)\)$)"
17)
18
19RULE_ACTIONS = (
20 "alert",
21 "config",
22 "log",
23 "pass",
24 "activate",
25 "dynamic",
26 "drop",
27 "reject",
28 "sdrop",
29)
30
31
[docs]
32@dataclass
33class RuleOption:
34 """Class representing a rule option."""
35
36 name: str
37 value: Optional[str] = None
38
39 def _to_dict(self: "RuleOption") -> dict[str, Hashable]:
40 """Returns the RuleOption represented as a dictionary."""
41 return {
42 "name": self.name,
43 "value": self.value,
44 }
45
[docs]
46 def __hash__(self) -> int:
47 """Returns a unique hash that can be used as a fingerprint for the rule option."""
48 return hash(tuple(sorted(self._to_dict().items())))
49
50
[docs]
51@dataclass
52class Rule:
53 """Class representing a rule."""
54
55 raw: str
56 header: str
57 enabled: bool
58 action: str
59 proto: str
60 source_addr: str
61 source_port: str
62 direction: str
63 dest_addr: str
64 dest_port: str
65 options: tuple[RuleOption, ...] = ()
66 metadata: tuple[str, ...] = ()
67 flowbits: tuple[str, ...] = ()
68 references: tuple[str, ...] = ()
69
70 def __init__(self, *args: tuple, **kwargs: dict) -> None:
71 """Private Init function.
72
73 Use suricata_check.utils.rule.parse() instead to create Rule instances.
74 """
75 if inspect.stack()[1].function != "parse":
76 raise RuntimeError(
77 "Rule instances must be created using suricata_check.utils.rule.parse()"
78 )
79 super().__init__(*args, **kwargs)
80
[docs]
81 def add_option(self, name: str, value: Optional[str]) -> None:
82 """Adds an option in the rule's options list."""
83 if not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance]
84 raise TypeError("Option name must be a string")
85 if value is not None and not isinstance(
86 value, str
87 ): # pyright: ignore[reportUnnecessaryIsInstance]
88 raise TypeError("Option value must be a string")
89
90 self.options = (*self.options, RuleOption(name=name, value=value))
91
100
[docs]
101 def add_flowbits_option(self, value: str) -> None:
102 """Adds a flowbits option in the rule's flowbits list."""
103 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance]
104 raise TypeError("Flowbits option value must be a string")
105 self.flowbits = (*self.flowbits, value)
106
[docs]
107 def add_reference_option(self, value: str) -> None:
108 """Adds a reference option in the rule's references list."""
109 if not isinstance(value, str): # pyright: ignore[reportUnnecessaryIsInstance]
110 raise TypeError("Reference option value must be a string")
111 self.references = (*self.references, value)
112
113 def _to_dict(self: "Rule") -> dict[str, Hashable]:
114 """Returns the Rule represented as a dictionary."""
115 return {
116 "raw": self.raw,
117 "header": self.header,
118 "enabled": self.enabled,
119 "action": self.action,
120 "proto": self.proto,
121 "source_addr": self.source_addr,
122 "source_port": self.source_port,
123 "direction": self.direction,
124 "dest_addr": self.dest_addr,
125 "dest_port": self.dest_port,
126 "options": self.options,
127 "metadata": self.metadata,
128 "flowbits": self.flowbits,
129 }
130
[docs]
131 def __hash__(self) -> int:
132 """Returns a unique hash that can be used as a fingerprint for the rule."""
133 return hash(tuple(sorted(self._to_dict().items())))
134
135
[docs]
136class ParsingError(RuntimeError):
137 """Raised when a rule cannot be parsed by suricata-check.
138
139 Most likely, such a rule is also an invalid Suricata rule.
140 """
141
142 def __init__(self: "ParsingError", message: str) -> None:
143 """Initializes the `ParsingError` with the raw rule as message."""
144 super().__init__(message)
145
146
[docs]
147def parse(buffer: str) -> Optional["Rule"]:
148 """Parse a rule stringand return a wrapped `Rule` instance.
149
150 Returns None when the text could not be parsed as a rule.
151
152 :param buffer: A string containing a single Suricata-like rule
153
154 :returns: An instance of of `Rule` representing the parsed rule
155 """
156 text = buffer.strip()
157
158 m = RULE_PATTERN.match(text)
159 if not m:
160 return None
161
162 rule = Rule()
163 rule.raw = m.group("raw").strip()
164 rule.header = m.group("header").strip()
165
166 if m.group("enabled") == "#":
167 rule.enabled = False
168 else:
169 rule.enabled = True
170
171 header_vals = _regex_provider.split(r"\s+", rule.header)
172 # 7 is the number of expected header fields
173 if len(header_vals) != 7: # noqa: PLR2004
174 return None
175 (
176 rule.action,
177 rule.proto,
178 rule.source_addr,
179 rule.source_port,
180 rule.direction,
181 rule.dest_addr,
182 rule.dest_port,
183 ) = header_vals
184
185 if rule.action not in RULE_ACTIONS:
186 return None
187
188 options = m.group("options")
189
190 __add_options_to_rule(text, rule, options)
191
192 return rule
193
194
195def __add_options_to_rule(rule_text: str, rule: "Rule", options: str) -> None:
196 while True:
197 if not options:
198 break
199 index = __find_opt_end(options)
200 if index < 0:
201 raise ParsingError("end of option not found: {}".format(rule_text))
202 option = options[:index].strip()
203 options = options[index + 1 :].strip()
204
205 if option.find(":") > -1:
206 name, val = [x.strip() for x in option.split(":", 1)]
207 else:
208 name = option
209 val = None
210
211 __add_option_to_rule(rule, name, val)
212
213
214def __add_option_to_rule( # noqa: C901, PLR0912
215 rule: "Rule", name: str, val: Optional[str]
216) -> None:
217 if val is not None and not isinstance(
218 val, str
219 ): # pyright: ignore[reportUnnecessaryIsInstance]
220 raise ParsingError(f"Invalid option value type ({type(val)}): {val}")
221 rule.add_option(name, val)
222
223 if name in ["gid", "sid", "rev"]:
224 try:
225 setattr(rule, name, int(val)) # pyright: ignore[reportArgumentType]
226 except ValueError:
227 raise ParsingError(f"Failed to convert {name} value to int: {val}")
228 elif name == "metadata":
229 if not isinstance(val, str):
230 raise ParsingError(f"Invalid metadata value type ({type(val)}): {val}")
231 rule.add_metadata_options([v.strip() for v in val.split(",")])
232 elif name == "flowbits":
233 if not isinstance(val, str):
234 raise ParsingError(f"Invalid flowbits value type ({type(val)}): {val}")
235 rule.add_flowbits_option(val)
236 elif name == "reference":
237 if not isinstance(val, str):
238 raise ParsingError(f"Invalid reference value type ({type(val)}): {val}")
239 rule.add_reference_option(val)
240 elif name == "msg":
241 if not isinstance(val, str):
242 raise ParsingError(f"Invalid msg value type ({type(val)}): {val}")
243 if val.startswith('"') and val.endswith('"'):
244 val = val[1:-1]
245 setattr(rule, name, val)
246 else:
247 setattr(rule, name, val)
248
249
250def __find_opt_end(options: str) -> int:
251 """Find the end of an option (;) handling escapes."""
252 offset = 0
253
254 while True:
255 i = options[offset:].find(";")
256 if options[offset + i - 1] == "\\":
257 offset += 2
258 else:
259 return offset + i