Source code for suricata_check.suricata_check

  1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
  2
  3import atexit
  4import configparser
  5import io
  6import json
  7import logging
  8import logging.handlers
  9import multiprocessing
 10import os
 11import sys
 12import threading
 13from collections.abc import Sequence
 14from typing import (
 15    Any,
 16    Callable,
 17    Literal,
 18    Optional,
 19    TypeVar,
 20    Union,
 21)
 22
 23import click
 24
 25_AnyCallable = Callable[..., Any]
 26_FC = TypeVar("_FC", bound=Union[_AnyCallable, click.Command])
 27
 28LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
 29
 30# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
 31_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
 32if sys.path[0] != _suricata_check_path:
 33    sys.path.insert(0, _suricata_check_path)
 34
 35from suricata_check._checkers import get_checkers  # noqa: E402
 36from suricata_check._output import (  # noqa: E402
 37    summarize_output,
 38    summarize_rule,
 39    write_output,
 40)
 41from suricata_check._version import (  # noqa: E402
 42    __version__,
 43    check_for_update,
 44    get_dependency_versions,
 45)
 46from suricata_check.checkers.interface import CheckerInterface  # noqa: E402
 47from suricata_check.utils._click import ClickHandler, help_option  # noqa: E402
 48from suricata_check.utils._path import find_rules_file  # noqa: E402
 49from suricata_check.utils.checker import (  # noqa: E402
 50    check_rule_option_recognition,
 51    get_rule_option,
 52    get_rule_suboption,
 53)
 54from suricata_check.utils.checker_typing import (  # noqa: E402
 55    InvalidRuleError,
 56    OutputReport,
 57    RuleReport,
 58)
 59from suricata_check.utils.regex import is_valid_rule  # noqa: E402
 60from suricata_check.utils.regex_provider import get_regex_provider  # noqa: E402
 61from suricata_check.utils.rule import ParsingError, Rule, parse  # noqa: E402
 62
 63LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
 64LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
 65
 66# Define all command line arguments and their properties
 67CLI_ARGUMENTS: dict[str, dict[str, Any]] = {
 68    "rules": {
 69        "help": "Path to Suricata rules to provide check on.",
 70        "show_default": True,
 71        "type": str,
 72        "required": False,
 73        "default": ".",
 74        "cli_options": ["-r"],
 75    },
 76    "single_rule": {
 77        "help": "A single Suricata rule to be checked",
 78        "show_default": False,
 79        "type": str,
 80        "required": False,
 81        "default": None,
 82        "cli_options": ["-s"],
 83    },
 84    "out": {
 85        "help": "Path to suricata-check output folder.",
 86        "show_default": True,
 87        "type": str,
 88        "required": False,
 89        "default": ".",
 90        "cli_options": ["-o"],
 91    },
 92    "log_level": {
 93        "help": f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
 94        "show_default": True,
 95        "type": str,
 96        "required": False,
 97        "default": "DEBUG",
 98    },
 99    "gitlab": {
100        "help": "Flag to create CodeClimate output report for GitLab CI/CD.",
101        "show_default": True,
102        "type": bool,
103        "required": False,
104        "default": False,
105        "is_flag": True,
106    },
107    "github": {
108        "help": "Flag to write workflow commands to stdout for GitHub CI/CD.",
109        "show_default": True,
110        "type": bool,
111        "required": False,
112        "default": False,
113        "is_flag": True,
114    },
115    "evaluate_disabled": {
116        "help": "Flag to evaluate disabled rules.",
117        "show_default": True,
118        "type": bool,
119        "required": False,
120        "default": False,
121        "is_flag": True,
122    },
123    "issue_severity": {
124        "help": f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
125        "show_default": True,
126        "type": str,
127        "required": False,
128        "default": "INFO",
129    },
130    "include_all": {
131        "help": "Flag to indicate all checker codes should be enabled.",
132        "show_default": True,
133        "type": bool,
134        "required": False,
135        "default": False,
136        "is_flag": True,
137        "cli_options": ["-a"],
138    },
139    "include": {
140        "help": "List of all checker codes to enable.",
141        "show_default": True,
142        "type": tuple,
143        "required": False,
144        "default": (),
145        "multiple": True,
146        "cli_options": ["-i"],
147    },
148    "exclude": {
149        "help": "List of all checker codes to disable.",
150        "show_default": True,
151        "type": tuple,
152        "required": False,
153        "default": (),
154        "multiple": True,
155        "cli_options": ["-e"],
156    },
157}
158CLI_ARGUMENT_TYPE = Optional[Union[str, bool, tuple]]
159
160_logger = logging.getLogger(__name__)
161
162_regex_provider = get_regex_provider()
163
164
165def __create_click_option(name: str, props: dict[str, Any]) -> Callable[[_FC], _FC]:
166    """Create a click.option decorator from argument properties."""
167    kwargs = {
168        "help": props["help"],
169        "show_default": props["show_default"],
170        "default": props.get("default"),
171        "is_flag": props.get("is_flag", False),
172        "multiple": props.get("multiple", False),
173    }
174
175    # Add any additional CLI options (like -r, -s, etc.)
176    cli_opts = props.get("cli_options", [])
177    args = [f"--{name.replace('_', '-')}", *cli_opts]
178
179    return click.option(*args, **kwargs)
180
181
182def __main_decorators() -> Callable[[Callable], click.Command]:
183    """Create the CLI command with all options."""
184
185    def decorator(f: Callable) -> click.Command:
186        # Apply all options in reverse order (bottom to top)
187        command = click.command()(f)
188        command = help_option("-h", "--help")(command)
189
190        # Add ini option first since it's needed before processing other options
191        command = click.option(
192            "--ini",
193            help="Path to suricata-check.ini file to read configuration from.",
194            show_default=True,
195            type=str,
196            default=None,
197        )(command)
198
199        # Apply options from CLI_ARGUMENTS
200        for name, props in reversed(CLI_ARGUMENTS.items()):
201            command = __create_click_option(name, props)(command)
202
203        return command
204
205    return decorator
206
207
208@__main_decorators()
209def main(**kwargs: dict[str, Any]) -> None:  # noqa: C901, PLR0915
210    """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
211
212    Raises:
213      BadParameter: If provided arguments are invalid.
214
215      RuntimeError: If no checkers could be automatically discovered.
216
217    """
218    # Look for a ini file and parse it.
219    ini_kwargs = get_ini_kwargs(
220        str(kwargs["ini"]) if kwargs["ini"] is not None else None  # type: ignore reportUnnecessaryComparison
221    )
222
223    # Verify CLI argument types and get CLI arguments or use default arguments
224    rules: str = __get_verified_kwarg(
225        [kwargs, ini_kwargs], "rules"
226    )  # pyright: ignore[reportAssignmentType]
227    single_rule: Optional[str] = __get_verified_kwarg(
228        [kwargs, ini_kwargs], "single_rule"
229    )  # pyright: ignore[reportAssignmentType]
230    out: str = __get_verified_kwarg(
231        [kwargs, ini_kwargs], "out"
232    )  # pyright: ignore[reportAssignmentType]
233    log_level: LogLevel = __get_verified_kwarg(
234        [kwargs, ini_kwargs], "log_level"
235    )  # pyright: ignore[reportAssignmentType]
236    gitlab: bool = __get_verified_kwarg(
237        [kwargs, ini_kwargs], "gitlab"
238    )  # pyright: ignore[reportAssignmentType]
239    github: bool = __get_verified_kwarg(
240        [kwargs, ini_kwargs], "github"
241    )  # pyright: ignore[reportAssignmentType]
242    evaluate_disabled: bool = __get_verified_kwarg(
243        [kwargs, ini_kwargs], "evaluate_disabled"
244    )  # pyright: ignore[reportAssignmentType]
245    issue_severity: LogLevel = __get_verified_kwarg(
246        [kwargs, ini_kwargs], "issue_severity"
247    )  # pyright: ignore[reportAssignmentType]
248    include_all: bool = __get_verified_kwarg(
249        [kwargs, ini_kwargs], "include_all"
250    )  # pyright: ignore[reportAssignmentType]
251    include: tuple[str, ...] = __get_verified_kwarg(
252        [kwargs, ini_kwargs], "include"
253    )  # pyright: ignore[reportAssignmentType]
254    exclude: tuple[str, ...] = __get_verified_kwarg(
255        [kwargs, ini_kwargs], "exclude"
256    )  # pyright: ignore[reportAssignmentType]
257
258    # Verify that out argument is valid
259    if os.path.exists(out) and not os.path.isdir(out):
260        raise click.BadParameter(f"Error: {out} is not a directory.")
261
262    # Verify that log_level argument is valid
263    if log_level not in LOG_LEVELS:
264        raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
265
266    # Create out directory if non-existent
267    if not os.path.exists(out):
268        os.makedirs(out)
269
270    # Setup logging from a seperate thread
271    queue = multiprocessing.Manager().Queue()
272    queue_handler = logging.handlers.QueueHandler(queue)
273
274    click_handler = ClickHandler(
275        github=github, github_level=getattr(logging, log_level)
276    )
277    logging.basicConfig(
278        level=log_level,
279        format="%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s",
280        handlers=(queue_handler, click_handler),
281        force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", "FALSE") == "TRUE",
282    )
283
284    file_handler = logging.FileHandler(
285        filename=os.path.join(out, "suricata-check.log"),
286        delay=True,
287    )
288    queue_listener = logging.handlers.QueueListener(
289        queue,
290        file_handler,
291        respect_handler_level=True,
292    )
293
294    def _at_exit() -> None:
295        """Cleans up logging listener and handlers before exiting."""
296        queue_listener.enqueue_sentinel()
297        queue_listener.stop()
298        file_handler.flush()
299        file_handler.close()
300        atexit.unregister(_at_exit)
301
302    atexit.register(_at_exit)
303
304    queue_listener.start()
305
306    # Log the arguments:
307    _logger.info("Running suricata-check with the following arguments:")
308    for arg in CLI_ARGUMENTS:
309        _logger.info("%s: %s", arg, locals().get(arg))
310
311    # Log the environment:
312    _logger.debug("Platform: %s", sys.platform)
313    _logger.debug("Python version: %s", sys.version)
314    _logger.debug("suricata-check path: %s", _suricata_check_path)
315    _logger.debug("suricata-check version: %s", __version__)
316    for package, version in get_dependency_versions().items():
317        _logger.debug("Dependency %s version: %s", package, version)
318
319    threading.Thread(
320        target=check_for_update,
321    ).start()
322
323    # Verify that include and exclude arguments are valid
324    if include_all and len(include) > 0:
325        raise click.BadParameter(
326            "Error: Cannot use --include-all and --include together."
327        )
328    if include_all:
329        include = (".*",)
330
331    # Verify that issue_severity argument is valid
332    if issue_severity not in LOG_LEVELS:
333        raise click.BadParameter(
334            f"Error: {issue_severity} is not a valid issue severity or log level."
335        )
336
337    checkers = get_checkers(
338        include, exclude, issue_severity=getattr(logging, issue_severity)
339    )
340
341    if single_rule is not None:
342        __main_single_rule(out, single_rule, checkers)
343
344        # Return here so no rules file is processed.
345        _at_exit()
346        return
347
348    # Check if the rules argument is valid and find the rules file
349    rules = find_rules_file(rules)
350
351    output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
352
353    write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
354
355    _at_exit()
356
357
[docs] 358def get_ini_kwargs(path: Optional[str]) -> dict[str, Any]: 359 """Read configuration from INI file based on CLI_ARGUMENTS structure.""" 360 ini_kwargs: dict[str, Any] = {} 361 if path is not None and not os.path.exists(path): 362 raise click.BadParameter( 363 f"Error: INI file provided in {path} but no options loaded" 364 ) 365 366 # Use the default path if no path was provided 367 if path is None: 368 path = "suricata-check.ini" 369 if not os.path.exists(path): 370 return {} 371 372 config_parser = configparser.ConfigParser( 373 empty_lines_in_values=False, 374 default_section="suricata-check", 375 converters={"tuple": lambda x: tuple(json.loads(x))}, 376 ) 377 config_parser.read(path) 378 379 # Process each argument defined in CLI_ARGUMENTS 380 for arg_name, arg_props in CLI_ARGUMENTS.items(): 381 ini_key = arg_name.replace("_", "-") 382 if not config_parser.has_option("suricata-check", ini_key): 383 continue 384 385 # Get the value based on the argument type 386 if arg_props["type"] is bool: 387 ini_kwargs[arg_name] = config_parser.getboolean("suricata-check", ini_key) 388 elif arg_props["type"] is tuple: 389 ini_kwargs[arg_name] = config_parser.gettuple("suricata-check", ini_key) # type: ignore reportAttributeAccessIssue 390 else: 391 ini_kwargs[arg_name] = config_parser.get("suricata-check", ini_key) 392 if arg_props["type"] is str: 393 ini_kwargs[arg_name] = ini_kwargs[arg_name].strip('"') 394 395 return ini_kwargs
396 397 398def __get_verified_kwarg( 399 kwargss: Sequence[dict[str, Any]], 400 name: str, 401) -> CLI_ARGUMENT_TYPE: 402 for kwargs in kwargss: 403 if name in kwargs: 404 if kwargs[name] is None: 405 if ( 406 not CLI_ARGUMENTS[name]["required"] 407 and CLI_ARGUMENTS[name]["default"] is not None 408 ): 409 return None 410 return CLI_ARGUMENTS[name]["default"] 411 412 if kwargs[name] is not CLI_ARGUMENTS[name]["default"]: 413 if not isinstance(kwargs[name], CLI_ARGUMENTS[name]["type"]): 414 raise click.BadParameter( 415 f"""Error: \ 416 Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` \ 417 but has value {kwargs[name]} of type {kwargs[name].__class__} instead.""" 418 ) 419 return kwargs[name] 420 421 return CLI_ARGUMENTS[name]["default"] 422 423 424def __main_single_rule( 425 out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]] 426) -> None: 427 rule: Optional[Rule] = parse(single_rule) 428 429 # Verify that a rule was parsed correctly. 430 if rule is None: 431 msg = f"Error parsing rule from user input: {single_rule}" 432 _logger.critical(msg) 433 raise click.BadParameter(f"Error: {msg}") 434 435 if not is_valid_rule(rule): 436 msg = f"Error parsing rule from user input: {single_rule}" 437 _logger.critical(msg) 438 raise click.BadParameter(f"Error: {msg}") 439 440 _logger.debug("Processing rule: %s", get_rule_option(rule, "sid")) 441 442 rule_report = analyze_rule(rule, checkers=checkers) 443 444 write_output(OutputReport(rules=[rule_report]), out) 445 446
[docs] 447def process_rules_file( # noqa: C901, PLR0912, PLR0915 448 rules: str, 449 evaluate_disabled: bool, 450 checkers: Optional[Sequence[CheckerInterface]] = None, 451) -> OutputReport: 452 """Processes a rule file and returns a list of rules and their issues. 453 454 Args: 455 rules: A path to a Suricata rules file. 456 evaluate_disabled: A flag indicating whether disabled rules should be evaluated. 457 checkers: The checkers to be used when processing the rule file. 458 459 Returns: 460 A list of rules and their issues. 461 462 Raises: 463 RuntimeError: If no checkers could be automatically discovered. 464 465 """ 466 if checkers is None: 467 checkers = get_checkers() 468 469 output = OutputReport() 470 471 with ( 472 open( 473 os.path.normpath(rules), 474 buffering=io.DEFAULT_BUFFER_SIZE, 475 ) as rules_fh, 476 ): 477 if len(checkers) == 0: 478 msg = "No checkers provided for processing rules." 479 _logger.error(msg) 480 raise RuntimeError(msg) 481 482 _logger.info("Processing rule file: %s", rules) 483 484 collected_multiline_parts: Optional[str] = None 485 multiline_begin_number: Optional[int] = None 486 487 for number, line in enumerate(rules_fh.readlines(), start=1): 488 # First work on collecting and parsing multiline rules 489 if line.rstrip("\r\n").endswith("\\"): 490 multiline_part = line.rstrip("\r\n")[:-1] 491 492 if collected_multiline_parts is None: 493 collected_multiline_parts = multiline_part 494 multiline_begin_number = number 495 else: 496 collected_multiline_parts += multiline_part.lstrip() 497 498 continue 499 500 # Process final part of multiline rule if one is being collected 501 if collected_multiline_parts is not None: 502 collected_multiline_parts += line.lstrip() 503 504 rule_line = collected_multiline_parts.strip() 505 506 collected_multiline_parts = None 507 # If no multiline rule is being collected process as a potential single line rule 508 else: 509 if len(line.strip()) == 0: 510 continue 511 512 if line.strip().startswith("#"): 513 if evaluate_disabled: 514 # Verify that this line is a rule and not a comment 515 if parse(line) is None: 516 # Log the comment since it may be a invalid rule 517 _logger.warning( 518 "Ignoring comment on line %i: %s", number, line 519 ) 520 continue 521 else: 522 # Skip the rule 523 continue 524 525 rule_line = line.strip() 526 527 try: 528 rule: Optional[Rule] = parse(rule_line) 529 except ParsingError: 530 _logger.error( 531 "Internal error in parsing of rule on line %i: %s", 532 number, 533 rule_line, 534 ) 535 rule = None 536 537 # Parse comment and potential ignore comment to ignore rules 538 ignore = __parse_type_ignore(rule) 539 540 # Verify that a rule was parsed correctly. 541 if rule is None: 542 _logger.error("Error parsing rule on line %i: %s", number, rule_line) 543 continue 544 545 if not is_valid_rule(rule): 546 _logger.error("Invalid rule on line %i: %s", number, rule_line) 547 continue 548 549 _logger.debug( 550 "Processing rule: %s on line %i", get_rule_option(rule, "sid"), number 551 ) 552 553 rule_report: RuleReport = analyze_rule( 554 rule, 555 checkers=checkers, 556 ignore=ignore, 557 ) 558 rule_report.line_begin = multiline_begin_number or number 559 rule_report.line_end = number 560 561 output.rules.append(rule_report) 562 563 multiline_begin_number = None 564 565 _logger.info("Completed processing rule file: %s", rules) 566 567 output.summary = summarize_output(output, checkers) 568 569 return output
570 571 572def __parse_type_ignore(rule: Optional[Rule]) -> Optional[Sequence[str]]: 573 if rule is None: 574 return None 575 576 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check") 577 if ignore_value is None: 578 return [] 579 580 return ignore_value.strip(' "').split(",") 581 582
[docs] 583def analyze_rule( 584 rule: Rule, 585 checkers: Optional[Sequence[CheckerInterface]] = None, 586 ignore: Optional[Sequence[str]] = None, 587) -> RuleReport: 588 """Checks a rule and returns a dictionary containing the rule and a list of issues found. 589 590 Args: 591 rule: The rule to be checked. 592 checkers: The checkers to be used to check the rule. 593 ignore: Regular expressions to match checker codes to ignore 594 595 Returns: 596 A list of issues found in the rule. 597 Each issue is typed as a `dict`. 598 599 Raises: 600 InvalidRuleError: If the rule does not follow the Suricata syntax. 601 602 """ 603 if not is_valid_rule(rule): 604 raise InvalidRuleError(rule.raw) 605 606 check_rule_option_recognition(rule) 607 608 if checkers is None: 609 checkers = get_checkers() 610 611 rule_report: RuleReport = RuleReport(rule=rule) 612 613 _logger.warning(ignore) 614 615 compiled_ignore = ( 616 [_regex_provider.compile(r) for r in ignore] if ignore is not None else [] 617 ) 618 619 for checker in checkers: 620 try: 621 issues = checker.check_rule(rule) 622 for r in compiled_ignore: 623 issues = list(filter(lambda issue: r.match(issue.code) is None, issues)) 624 rule_report.add_issues(issues) 625 except Exception as exception: # noqa: BLE001 626 _logger.warning( 627 "Failed to run %s on rule: %s", 628 checker.__class__.__name__, 629 rule.raw, 630 extra={"exception": exception}, 631 ) 632 633 rule_report.summary = summarize_rule(rule_report, checkers) 634 635 return rule_report
636 637 638if __name__ == "__main__": 639 main()