Source code for suricata_check.checkers.principle.principle

  1"""`PrincipleChecker`."""
  2
  3import logging
  4from typing import Optional
  5
  6import idstools.rule
  7
  8from suricata_check.checkers.interface.checker import CheckerInterface
  9from suricata_check.checkers.principle._utils import get_message
 10from suricata_check.utils.checker import (
 11    count_rule_options,
 12    get_rule_option,
 13    get_rule_options,
 14    is_rule_option_equal_to,
 15    is_rule_option_equal_to_regex,
 16    is_rule_option_set,
 17    is_rule_suboption_set,
 18)
 19from suricata_check.utils.checker_typing import ISSUES_TYPE, Issue
 20from suricata_check.utils.regex import (
 21    ALL_DETECTION_KEYWORDS,
 22    BUFFER_KEYWORDS,
 23    CONTENT_KEYWORDS,
 24    IP_ADDRESS_REGEX,
 25    OTHER_PAYLOAD_KEYWORDS,
 26    SIZE_KEYWORDS,
 27    get_options_regex,
 28    get_regex_provider,
 29    get_rule_body,
 30)
 31
 32_regex_provider = get_regex_provider()
 33
 34_BITS_ISSET_REGEX = _regex_provider.compile(r"^\s*isset\s*,.*$")
 35_BITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset\s*,.*$")
 36_FLOWINT_ISSET_REGEX = _regex_provider.compile(r"^.*,\s*isset\s*,.*$")
 37_FLOWINT_ISNOTSET_REGEX = _regex_provider.compile(r"^.*,\s*isnotset\s*,.*$")
 38_THRESHOLD_LIMITED_REGEX = _regex_provider.compile(r"^.*type\s+(limit|both).*$")
 39_FLOWBITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset.*$")
 40_HTTP_URI_QUERY_PARAMETER_REGEX = _regex_provider.compile(
 41    rf"^\(.*\s+http\.uri\s*;\s*content\s*:\s*\"[^\"]*\?([^\"]|\\\")+\"\s*;((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*)|((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*\s+{get_options_regex(BUFFER_KEYWORDS).pattern}\s*;.*)\)$"
 42)
 43_PROXY_MSG_REGEX = _regex_provider.compile(
 44    r"^.*(Suspicious).*$", flags=_regex_provider.IGNORECASE
 45)
 46_SPECIFIC_MSG_REGEX = _regex_provider.compile(
 47    r"^.*(CVE|Vulnerability).*$", flags=_regex_provider.IGNORECASE
 48)
 49
 50
[docs] 51class PrincipleChecker(CheckerInterface): 52 """The `PrincipleChecker` contains several checks based on the Ruling the Unruly paper and target specificity and coverage. 53 54 Codes P000-P009 report on non-adherence to rule design principles. 55 56 Specifically, the `PrincipleChecker` checks for the following: 57 P000: No Limited Proxy, the rule does not detect a characteristic that relates directly to a malicious action, 58 making it potentially noisy. 59 60 P001: No Successful Malicious Action, the rule does not distinguish between successful and unsuccessful malicious 61 actions, making it potentially noisy. 62 63 P002: No Alert Throttling, the rule does not utilize the threshold limit option` to prevent alert flooding, 64 making it potentially noisy. 65 66 P003: No Exceptions, the rule does not include any exceptions for commom benign traffic, 67 making it potentially noisy. 68 69 P004: No Generalized Characteristic, the rule does detect a characteristic that is so specific 70 that it is unlikely generalize. 71 72 P005: No Generalized Position, the rule does detect the characteristic in a fixed position 73 that and is unlikely to generalize as a result. 74 """ 75 76 codes = { 77 "P000": {"severity": logging.INFO}, 78 "P001": {"severity": logging.INFO}, 79 "P002": {"severity": logging.INFO}, 80 "P003": {"severity": logging.INFO}, 81 "P004": {"severity": logging.INFO}, 82 "P005": {"severity": logging.INFO}, 83 } 84 85 def _check_rule( 86 self: "PrincipleChecker", 87 rule: idstools.rule.Rule, 88 ) -> ISSUES_TYPE: 89 issues: ISSUES_TYPE = [] 90 91 if count_rule_options( 92 rule, ALL_DETECTION_KEYWORDS 93 ) == 0 or is_rule_option_equal_to_regex(rule, "msg", _PROXY_MSG_REGEX): 94 issues.append( 95 Issue( 96 code="P000", 97 message=get_message("P000"), 98 ), 99 ) 100 101 if ( 102 self.__is_rule_initiated_internally(rule) is False 103 and self.__does_rule_account_for_server_response(rule) is False 104 and self.__does_rule_account_for_internal_content(rule) is False 105 and self.__is_rule_stateful(rule) is False 106 ): 107 issues.append( 108 Issue( 109 code="P001", 110 message=get_message("P001"), 111 ), 112 ) 113 114 if not self.__is_rule_threshold_limited(rule): 115 issues.append( 116 Issue( 117 code="P002", 118 message=get_message("P002"), 119 ), 120 ) 121 122 if not self.__does_rule_have_exceptions(rule): 123 issues.append( 124 Issue( 125 code="P003", 126 message=get_message("P003"), 127 ), 128 ) 129 130 if ( 131 count_rule_options(rule, "content") == 0 132 and not count_rule_options( 133 rule, 134 set(SIZE_KEYWORDS) 135 .union(CONTENT_KEYWORDS) 136 .union(OTHER_PAYLOAD_KEYWORDS), 137 ) 138 > 1 139 ) or ( 140 is_rule_option_equal_to_regex(rule, "msg", _SPECIFIC_MSG_REGEX) 141 and not is_rule_option_set(rule, "pcre") 142 ): 143 issues.append( 144 Issue( 145 code="P004", 146 message=get_message("P004"), 147 ), 148 ) 149 150 if self.__has_fixed_http_uri_query_parameter_location( 151 rule 152 ) or self.__has_single_match_at_fixed_location(rule): 153 issues.append( 154 Issue( 155 code="P005", 156 message=get_message("P005"), 157 ), 158 ) 159 160 return issues 161 162 @staticmethod 163 def __is_rule_initiated_internally( 164 rule: idstools.rule.Rule, 165 ) -> Optional[bool]: 166 if get_rule_option(rule, "proto") in ("ip",): 167 return None 168 169 dest_addr = get_rule_option(rule, "dest_addr") 170 assert dest_addr is not None 171 if ( 172 dest_addr not in ("any", "$EXTERNAL_NET") 173 and IP_ADDRESS_REGEX.match(dest_addr) is None 174 ): 175 if is_rule_suboption_set( 176 rule, "flow", "from_server" 177 ) or is_rule_suboption_set(rule, "flow", "to_client"): 178 return True 179 180 source_addr = get_rule_option(rule, "source_addr") 181 assert source_addr is not None 182 if ( 183 source_addr not in ("any", "$EXTERNAL_NET") 184 and IP_ADDRESS_REGEX.match(source_addr) is None 185 ): 186 if is_rule_suboption_set( 187 rule, "flow", "to_server" 188 ) or is_rule_suboption_set(rule, "flow", "from_client"): 189 return True 190 if is_rule_option_set(rule, "dns.query") or is_rule_option_set( 191 rule, 192 "dns_query", 193 ): 194 return True 195 196 return False 197 198 @staticmethod 199 def __does_rule_account_for_server_response( 200 rule: idstools.rule.Rule, 201 ) -> Optional[bool]: 202 if get_rule_option(rule, "proto") in ("ip",): 203 return None 204 205 if is_rule_suboption_set(rule, "flow", "from_server") or is_rule_suboption_set( 206 rule, "flow", "to_client" 207 ): 208 return True 209 210 msg = get_rule_option(rule, "msg") 211 assert msg is not None 212 if "response" in msg.lower(): 213 return True 214 215 return False 216 217 @staticmethod 218 def __does_rule_account_for_internal_content( 219 rule: idstools.rule.Rule, 220 ) -> bool: 221 source_addr = get_rule_option(rule, "source_addr") 222 assert source_addr is not None 223 if ( 224 source_addr not in ("any", "$EXTERNAL_NET") 225 and IP_ADDRESS_REGEX.match(source_addr) is None 226 ): 227 return True 228 229 return False 230 231 @staticmethod 232 def __is_rule_stateful( 233 rule: idstools.rule.Rule, 234 ) -> Optional[bool]: 235 if ( 236 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISSET_REGEX) 237 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISSET_REGEX) 238 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISSET_REGEX) 239 ): 240 return True 241 242 # flowbits.isnotset is used to reduce false positives as well, so it does not neccesarily indicate a stateful rule. 243 if ( 244 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISNOTSET_REGEX) 245 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISNOTSET_REGEX) 246 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISNOTSET_REGEX) 247 ): 248 return True 249 250 return False 251 252 @staticmethod 253 def __is_rule_threshold_limited( 254 rule: idstools.rule.Rule, 255 ) -> bool: 256 value = get_rule_option(rule, "threshold") 257 258 if value is None: 259 return False 260 261 if _THRESHOLD_LIMITED_REGEX.match(value) is not None: 262 return True 263 264 return False 265 266 @staticmethod 267 def __does_rule_have_exceptions( 268 rule: idstools.rule.Rule, 269 ) -> bool: 270 positive_matches = 0 271 negative_matches = 0 272 273 for option_value in get_rule_options(rule, CONTENT_KEYWORDS): 274 if option_value is None: 275 continue 276 if option_value.startswith("!"): 277 negative_matches += 1 278 else: 279 positive_matches += 1 280 281 if ( 282 positive_matches > 0 and negative_matches > 0 283 ) or is_rule_option_equal_to_regex( 284 rule, 285 "flowbits", 286 _FLOWBITS_ISNOTSET_REGEX, 287 ): 288 return True 289 290 return False 291 292 @staticmethod 293 def __has_fixed_http_uri_query_parameter_location( 294 rule: idstools.rule.Rule, 295 ) -> bool: 296 if count_rule_options(rule, "content") != 1: 297 return False 298 299 body = get_rule_body(rule) 300 if _HTTP_URI_QUERY_PARAMETER_REGEX.match(body) is not None: 301 return True 302 303 return False 304 305 @staticmethod 306 def __has_single_match_at_fixed_location( 307 rule: idstools.rule.Rule, 308 ) -> bool: 309 if ( 310 count_rule_options(rule, "content") == 2 # noqa: PLR2004 311 and is_rule_option_set(rule, "http.method") 312 and ( 313 is_rule_option_equal_to(rule, "content", "GET") 314 or is_rule_option_equal_to(rule, "content", "POST") 315 ) 316 ) and count_rule_options(rule, "content") != 1: 317 return False 318 319 contents = list( 320 set(get_rule_options(rule, "content")).difference(['"GET"', '"POST"']) 321 ) 322 if len(contents) != 1: 323 return False 324 content = contents[0] 325 326 if is_rule_option_set(rule, "startswith"): 327 return True 328 329 if content is not None: 330 # -2 to discard quotes 331 length = len(content) - 2 332 else: 333 length = -1 334 335 if ( 336 is_rule_option_equal_to(rule, "depth", str(length)) 337 or is_rule_option_equal_to(rule, "bsize", str(length)) 338 or ( 339 is_rule_option_set(rule, "http.uri") 340 and is_rule_option_equal_to(rule, "urilen", str(length)) 341 ) 342 ): 343 return True 344 345 return False