1"""`PrincipleChecker`."""
2
3import logging
4from typing import Optional
5
6import idstools.rule
7
8from suricata_check.checkers.interface.checker import CheckerInterface
9from suricata_check.checkers.principle._utils import get_message
10from suricata_check.utils.checker import (
11 count_rule_options,
12 get_rule_option,
13 get_rule_options,
14 is_rule_option_equal_to,
15 is_rule_option_equal_to_regex,
16 is_rule_option_set,
17 is_rule_suboption_set,
18)
19from suricata_check.utils.checker_typing import ISSUES_TYPE, Issue
20from suricata_check.utils.regex import (
21 ALL_DETECTION_KEYWORDS,
22 BUFFER_KEYWORDS,
23 CONTENT_KEYWORDS,
24 IP_ADDRESS_REGEX,
25 OTHER_PAYLOAD_KEYWORDS,
26 SIZE_KEYWORDS,
27 get_options_regex,
28 get_regex_provider,
29 get_rule_body,
30)
31
32_regex_provider = get_regex_provider()
33
34_BITS_ISSET_REGEX = _regex_provider.compile(r"^\s*isset\s*,.*$")
35_BITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset\s*,.*$")
36_FLOWINT_ISSET_REGEX = _regex_provider.compile(r"^.*,\s*isset\s*,.*$")
37_FLOWINT_ISNOTSET_REGEX = _regex_provider.compile(r"^.*,\s*isnotset\s*,.*$")
38_THRESHOLD_LIMITED_REGEX = _regex_provider.compile(r"^.*type\s+(limit|both).*$")
39_FLOWBITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset.*$")
40_HTTP_URI_QUERY_PARAMETER_REGEX = _regex_provider.compile(
41 rf"^\(.*\s+http\.uri\s*;\s*content\s*:\s*\"[^\"]*\?([^\"]|\\\")+\"\s*;((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*)|((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*\s+{get_options_regex(BUFFER_KEYWORDS).pattern}\s*;.*)\)$"
42)
43_PROXY_MSG_REGEX = _regex_provider.compile(
44 r"^.*(Suspicious).*$", flags=_regex_provider.IGNORECASE
45)
46_SPECIFIC_MSG_REGEX = _regex_provider.compile(
47 r"^.*(CVE|Vulnerability).*$", flags=_regex_provider.IGNORECASE
48)
49
50
[docs]
51class PrincipleChecker(CheckerInterface):
52 """The `PrincipleChecker` contains several checks based on the Ruling the Unruly paper and target specificity and coverage.
53
54 Codes P000-P009 report on non-adherence to rule design principles.
55
56 Specifically, the `PrincipleChecker` checks for the following:
57 P000: No Limited Proxy, the rule does not detect a characteristic that relates directly to a malicious action,
58 making it potentially noisy.
59
60 P001: No Successful Malicious Action, the rule does not distinguish between successful and unsuccessful malicious
61 actions, making it potentially noisy.
62
63 P002: No Alert Throttling, the rule does not utilize the threshold limit option` to prevent alert flooding,
64 making it potentially noisy.
65
66 P003: No Exceptions, the rule does not include any exceptions for commom benign traffic,
67 making it potentially noisy.
68
69 P004: No Generalized Characteristic, the rule does detect a characteristic that is so specific
70 that it is unlikely generalize.
71
72 P005: No Generalized Position, the rule does detect the characteristic in a fixed position
73 that and is unlikely to generalize as a result.
74 """
75
76 codes = {
77 "P000": {"severity": logging.INFO},
78 "P001": {"severity": logging.INFO},
79 "P002": {"severity": logging.INFO},
80 "P003": {"severity": logging.INFO},
81 "P004": {"severity": logging.INFO},
82 "P005": {"severity": logging.INFO},
83 }
84
85 def _check_rule(
86 self: "PrincipleChecker",
87 rule: idstools.rule.Rule,
88 ) -> ISSUES_TYPE:
89 issues: ISSUES_TYPE = []
90
91 if count_rule_options(
92 rule, ALL_DETECTION_KEYWORDS
93 ) == 0 or is_rule_option_equal_to_regex(rule, "msg", _PROXY_MSG_REGEX):
94 issues.append(
95 Issue(
96 code="P000",
97 message=get_message("P000"),
98 ),
99 )
100
101 if (
102 self.__is_rule_initiated_internally(rule) is False
103 and self.__does_rule_account_for_server_response(rule) is False
104 and self.__does_rule_account_for_internal_content(rule) is False
105 and self.__is_rule_stateful(rule) is False
106 ):
107 issues.append(
108 Issue(
109 code="P001",
110 message=get_message("P001"),
111 ),
112 )
113
114 if not self.__is_rule_threshold_limited(rule):
115 issues.append(
116 Issue(
117 code="P002",
118 message=get_message("P002"),
119 ),
120 )
121
122 if not self.__does_rule_have_exceptions(rule):
123 issues.append(
124 Issue(
125 code="P003",
126 message=get_message("P003"),
127 ),
128 )
129
130 if (
131 count_rule_options(rule, "content") == 0
132 and not count_rule_options(
133 rule,
134 set(SIZE_KEYWORDS)
135 .union(CONTENT_KEYWORDS)
136 .union(OTHER_PAYLOAD_KEYWORDS),
137 )
138 > 1
139 ) or (
140 is_rule_option_equal_to_regex(rule, "msg", _SPECIFIC_MSG_REGEX)
141 and not is_rule_option_set(rule, "pcre")
142 ):
143 issues.append(
144 Issue(
145 code="P004",
146 message=get_message("P004"),
147 ),
148 )
149
150 if self.__has_fixed_http_uri_query_parameter_location(
151 rule
152 ) or self.__has_single_match_at_fixed_location(rule):
153 issues.append(
154 Issue(
155 code="P005",
156 message=get_message("P005"),
157 ),
158 )
159
160 return issues
161
162 @staticmethod
163 def __is_rule_initiated_internally(
164 rule: idstools.rule.Rule,
165 ) -> Optional[bool]:
166 if get_rule_option(rule, "proto") in ("ip",):
167 return None
168
169 dest_addr = get_rule_option(rule, "dest_addr")
170 assert dest_addr is not None
171 if (
172 dest_addr not in ("any", "$EXTERNAL_NET")
173 and IP_ADDRESS_REGEX.match(dest_addr) is None
174 ):
175 if is_rule_suboption_set(
176 rule, "flow", "from_server"
177 ) or is_rule_suboption_set(rule, "flow", "to_client"):
178 return True
179
180 source_addr = get_rule_option(rule, "source_addr")
181 assert source_addr is not None
182 if (
183 source_addr not in ("any", "$EXTERNAL_NET")
184 and IP_ADDRESS_REGEX.match(source_addr) is None
185 ):
186 if is_rule_suboption_set(
187 rule, "flow", "to_server"
188 ) or is_rule_suboption_set(rule, "flow", "from_client"):
189 return True
190 if is_rule_option_set(rule, "dns.query") or is_rule_option_set(
191 rule,
192 "dns_query",
193 ):
194 return True
195
196 return False
197
198 @staticmethod
199 def __does_rule_account_for_server_response(
200 rule: idstools.rule.Rule,
201 ) -> Optional[bool]:
202 if get_rule_option(rule, "proto") in ("ip",):
203 return None
204
205 if is_rule_suboption_set(rule, "flow", "from_server") or is_rule_suboption_set(
206 rule, "flow", "to_client"
207 ):
208 return True
209
210 msg = get_rule_option(rule, "msg")
211 assert msg is not None
212 if "response" in msg.lower():
213 return True
214
215 return False
216
217 @staticmethod
218 def __does_rule_account_for_internal_content(
219 rule: idstools.rule.Rule,
220 ) -> bool:
221 source_addr = get_rule_option(rule, "source_addr")
222 assert source_addr is not None
223 if (
224 source_addr not in ("any", "$EXTERNAL_NET")
225 and IP_ADDRESS_REGEX.match(source_addr) is None
226 ):
227 return True
228
229 return False
230
231 @staticmethod
232 def __is_rule_stateful(
233 rule: idstools.rule.Rule,
234 ) -> Optional[bool]:
235 if (
236 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISSET_REGEX)
237 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISSET_REGEX)
238 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISSET_REGEX)
239 ):
240 return True
241
242 # flowbits.isnotset is used to reduce false positives as well, so it does not neccesarily indicate a stateful rule.
243 if (
244 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISNOTSET_REGEX)
245 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISNOTSET_REGEX)
246 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISNOTSET_REGEX)
247 ):
248 return True
249
250 return False
251
252 @staticmethod
253 def __is_rule_threshold_limited(
254 rule: idstools.rule.Rule,
255 ) -> bool:
256 value = get_rule_option(rule, "threshold")
257
258 if value is None:
259 return False
260
261 if _THRESHOLD_LIMITED_REGEX.match(value) is not None:
262 return True
263
264 return False
265
266 @staticmethod
267 def __does_rule_have_exceptions(
268 rule: idstools.rule.Rule,
269 ) -> bool:
270 positive_matches = 0
271 negative_matches = 0
272
273 for option_value in get_rule_options(rule, CONTENT_KEYWORDS):
274 if option_value is None:
275 continue
276 if option_value.startswith("!"):
277 negative_matches += 1
278 else:
279 positive_matches += 1
280
281 if (
282 positive_matches > 0 and negative_matches > 0
283 ) or is_rule_option_equal_to_regex(
284 rule,
285 "flowbits",
286 _FLOWBITS_ISNOTSET_REGEX,
287 ):
288 return True
289
290 return False
291
292 @staticmethod
293 def __has_fixed_http_uri_query_parameter_location(
294 rule: idstools.rule.Rule,
295 ) -> bool:
296 if count_rule_options(rule, "content") != 1:
297 return False
298
299 body = get_rule_body(rule)
300 if _HTTP_URI_QUERY_PARAMETER_REGEX.match(body) is not None:
301 return True
302
303 return False
304
305 @staticmethod
306 def __has_single_match_at_fixed_location(
307 rule: idstools.rule.Rule,
308 ) -> bool:
309 if (
310 count_rule_options(rule, "content") == 2 # noqa: PLR2004
311 and is_rule_option_set(rule, "http.method")
312 and (
313 is_rule_option_equal_to(rule, "content", "GET")
314 or is_rule_option_equal_to(rule, "content", "POST")
315 )
316 ) and count_rule_options(rule, "content") != 1:
317 return False
318
319 contents = list(
320 set(get_rule_options(rule, "content")).difference(['"GET"', '"POST"'])
321 )
322 if len(contents) != 1:
323 return False
324 content = contents[0]
325
326 if is_rule_option_set(rule, "startswith"):
327 return True
328
329 if content is not None:
330 # -2 to discard quotes
331 length = len(content) - 2
332 else:
333 length = -1
334
335 if (
336 is_rule_option_equal_to(rule, "depth", str(length))
337 or is_rule_option_equal_to(rule, "bsize", str(length))
338 or (
339 is_rule_option_set(rule, "http.uri")
340 and is_rule_option_equal_to(rule, "urilen", str(length))
341 )
342 ):
343 return True
344
345 return False