Source code for suricata_check._suricata_check

  1"""The `suricata_check._suricata_check` module contains the command line utility and the main program logic."""
  2
  3import atexit
  4import configparser
  5import io
  6import json
  7import logging
  8import logging.handlers
  9import multiprocessing
 10import os
 11import sys
 12import threading
 13from collections.abc import Sequence
 14from typing import (
 15    Any,
 16    Callable,
 17    Literal,
 18    Optional,
 19    TypeVar,
 20    Union,
 21)
 22
 23import click
 24
 25_AnyCallable = Callable[..., Any]
 26_FC = TypeVar("_FC", bound=Union[_AnyCallable, click.Command])
 27
 28LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
 29LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
 30
 31# Define all command line arguments and their properties
 32CLI_ARGUMENTS: dict[str, dict[str, Any]] = {
 33    "rules": {
 34        "help": "Path to Suricata rules to provide check on.",
 35        "show_default": True,
 36        "type": str,
 37        "required": False,
 38        "default": ".",
 39        "cli_options": ["-r"],
 40    },
 41    "single_rule": {
 42        "help": "A single Suricata rule to be checked",
 43        "show_default": False,
 44        "type": str,
 45        "required": False,
 46        "default": None,
 47        "cli_options": ["-s"],
 48    },
 49    "out": {
 50        "help": "Path to suricata-check output folder.",
 51        "show_default": True,
 52        "type": str,
 53        "required": False,
 54        "default": ".",
 55        "cli_options": ["-o"],
 56    },
 57    "log_level": {
 58        "help": f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
 59        "show_default": True,
 60        "type": str,
 61        "required": False,
 62        "default": "INFO",
 63    },
 64    "gitlab": {
 65        "help": "Flag to create CodeClimate output report for GitLab CI/CD.",
 66        "show_default": True,
 67        "type": bool,
 68        "required": False,
 69        "default": False,
 70        "is_flag": True,
 71    },
 72    "github": {
 73        "help": "Flag to write workflow commands to stdout for GitHub CI/CD.",
 74        "show_default": True,
 75        "type": bool,
 76        "required": False,
 77        "default": False,
 78        "is_flag": True,
 79    },
 80    "evaluate_disabled": {
 81        "help": "Flag to evaluate disabled rules.",
 82        "show_default": True,
 83        "type": bool,
 84        "required": False,
 85        "default": False,
 86        "is_flag": True,
 87    },
 88    "issue_severity": {
 89        "help": f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
 90        "show_default": True,
 91        "type": str,
 92        "required": False,
 93        "default": "INFO",
 94    },
 95    "include_all": {
 96        "help": "Flag to indicate all checker codes should be enabled.",
 97        "show_default": True,
 98        "type": bool,
 99        "required": False,
100        "default": False,
101        "is_flag": True,
102        "cli_options": ["-a"],
103    },
104    "include": {
105        "help": "List of all checker codes to enable. Regexes can be provided.",
106        "show_default": True,
107        "type": tuple,
108        "required": False,
109        "default": (),
110        "multiple": True,
111        "cli_options": ["-i"],
112    },
113    "exclude": {
114        "help": "List of all checker codes to disable. Regexes can be provided.",
115        "show_default": True,
116        "type": tuple,
117        "required": False,
118        "default": (),
119        "multiple": True,
120        "cli_options": ["-e"],
121    },
122}
123CLI_ARGUMENT_TYPE = Optional[Union[str, bool, tuple]]
124
125# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
126_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
127if sys.path[0] != _suricata_check_path:
128    sys.path.insert(0, _suricata_check_path)
129
130from suricata_check._checkers import get_checkers  # noqa: E402
131from suricata_check._output import (  # noqa: E402
132    summarize_output,
133    summarize_rule,
134    write_output,
135)
136from suricata_check._version import (  # noqa: E402
137    __version__,
138    check_for_update,
139    get_dependency_versions,
140)
141from suricata_check.checkers.interface import CheckerInterface  # noqa: E402
142from suricata_check.utils._click import ClickHandler, help_option  # noqa: E402
143from suricata_check.utils._path import find_rules_file  # noqa: E402
144from suricata_check.utils.checker import (  # noqa: E402
145    check_rule_option_recognition,
146    get_rule_option,
147    get_rule_suboption,
148)
149from suricata_check.utils.checker_typing import (  # noqa: E402
150    InvalidRuleError,
151    OutputReport,
152    RuleReport,
153)
154from suricata_check.utils.regex import is_valid_rule  # noqa: E402
155from suricata_check.utils.regex_provider import get_regex_provider  # noqa: E402
156from suricata_check.utils.rule import ParsingError, Rule, parse  # noqa: E402
157
158_logger = logging.getLogger(__name__)
159
160_regex_provider = get_regex_provider()
161
162
163def __create_click_option(name: str, props: dict[str, Any]) -> Callable[[_FC], _FC]:
164    """Create a click.option decorator from argument properties."""
165    kwargs = {
166        "help": props["help"],
167        "show_default": props["show_default"],
168        "default": props.get("default"),
169        "is_flag": props.get("is_flag", False),
170        "multiple": props.get("multiple", False),
171    }
172
173    # Add any additional CLI options (like -r, -s, etc.)
174    cli_opts = props.get("cli_options", [])
175    args = [f"--{name.replace('_', '-')}", *cli_opts]
176
177    return click.option(*args, **kwargs)
178
179
180def __main_decorators() -> Callable[[Callable], click.Command]:
181    """Create the CLI command with all options."""
182
183    def decorator(f: Callable) -> click.Command:
184        # Apply all options in reverse order (bottom to top)
185        command = click.command()(f)
186        command = help_option("-h", "--help")(command)
187
188        # Add ini option first since it's needed before processing other options
189        command = click.option(
190            "--ini",
191            help="Path to suricata-check.ini file to read configuration from.",
192            show_default=True,
193            type=str,
194            default=None,
195        )(command)
196
197        # Apply options from CLI_ARGUMENTS
198        for name, props in CLI_ARGUMENTS.items():
199            command = __create_click_option(name, props)(command)
200
201        return command
202
203    return decorator
204
205
206@__main_decorators()
207def main(**kwargs: dict[str, Any]) -> None:  # noqa: C901, PLR0915
208    """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
209
210    Check the CLI usage documentation for a full overview of how to use the CLI: https://suricata-check.teuwen.net/cli_usage.html
211    """
212    # Look for a ini file and parse it.
213    ini_kwargs = get_ini_kwargs(
214        str(kwargs["ini"]) if kwargs["ini"] is not None else None,  # type: ignore reportUnnecessaryComparison
215    )
216
217    # Verify CLI argument types and get CLI arguments or use default arguments
218    rules: str = __get_verified_kwarg(
219        [kwargs, ini_kwargs],
220        "rules",
221    )  # pyright: ignore[reportAssignmentType]
222    single_rule: Optional[str] = __get_verified_kwarg(
223        [kwargs, ini_kwargs],
224        "single_rule",
225    )  # pyright: ignore[reportAssignmentType]
226    out: str = __get_verified_kwarg(
227        [kwargs, ini_kwargs],
228        "out",
229    )  # pyright: ignore[reportAssignmentType]
230    log_level: LogLevel = __get_verified_kwarg(
231        [kwargs, ini_kwargs],
232        "log_level",
233    )  # pyright: ignore[reportAssignmentType]
234    gitlab: bool = __get_verified_kwarg(
235        [kwargs, ini_kwargs],
236        "gitlab",
237    )  # pyright: ignore[reportAssignmentType]
238    github: bool = __get_verified_kwarg(
239        [kwargs, ini_kwargs],
240        "github",
241    )  # pyright: ignore[reportAssignmentType]
242    evaluate_disabled: bool = __get_verified_kwarg(
243        [kwargs, ini_kwargs],
244        "evaluate_disabled",
245    )  # pyright: ignore[reportAssignmentType]
246    issue_severity: LogLevel = __get_verified_kwarg(
247        [kwargs, ini_kwargs],
248        "issue_severity",
249    )  # pyright: ignore[reportAssignmentType]
250    include_all: bool = __get_verified_kwarg(
251        [kwargs, ini_kwargs],
252        "include_all",
253    )  # pyright: ignore[reportAssignmentType]
254    include: tuple[str, ...] = __get_verified_kwarg(
255        [kwargs, ini_kwargs],
256        "include",
257    )  # pyright: ignore[reportAssignmentType]
258    exclude: tuple[str, ...] = __get_verified_kwarg(
259        [kwargs, ini_kwargs],
260        "exclude",
261    )  # pyright: ignore[reportAssignmentType]
262
263    # Verify that out argument is valid
264    if os.path.exists(out) and not os.path.isdir(out):
265        raise click.BadParameter(f"Error: {out} is not a directory.")
266
267    # Verify that log_level argument is valid
268    if log_level not in LOG_LEVELS:
269        raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
270
271    # Create out directory if non-existent
272    if not os.path.exists(out):
273        os.makedirs(out)
274
275    # Setup logging from a seperate thread
276    queue = multiprocessing.Manager().Queue()
277    queue_handler = logging.handlers.QueueHandler(queue)
278
279    click_handler = ClickHandler(
280        github=github,
281        github_level=getattr(logging, log_level),
282    )
283    logging.basicConfig(
284        level=log_level,
285        format="%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s",
286        handlers=(queue_handler, click_handler),
287        force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", "FALSE") == "TRUE",
288    )
289
290    file_handler = logging.FileHandler(
291        filename=os.path.join(out, "suricata-check.log"),
292        delay=True,
293    )
294    queue_listener = logging.handlers.QueueListener(
295        queue,
296        file_handler,
297        respect_handler_level=True,
298    )
299
300    def _at_exit() -> None:
301        """Cleans up logging listener and handlers before exiting."""
302        queue_listener.enqueue_sentinel()
303        queue_listener.stop()
304        file_handler.flush()
305        file_handler.close()
306        atexit.unregister(_at_exit)
307
308    atexit.register(_at_exit)
309
310    queue_listener.start()
311
312    # Log the arguments:
313    _logger.info("Running suricata-check with the following arguments:")
314    for arg in CLI_ARGUMENTS:
315        _logger.debug("%s: %s", arg, locals().get(arg))
316
317    # Log the environment:
318    _logger.debug("Platform: %s", sys.platform)
319    _logger.debug("Python version: %s", sys.version)
320    _logger.debug("suricata-check path: %s", _suricata_check_path)
321    _logger.debug("suricata-check version: %s", __version__)
322    for package, version in get_dependency_versions().items():
323        _logger.debug("Dependency %s version: %s", package, version)
324
325    threading.Thread(
326        target=check_for_update,
327    ).start()
328
329    # Verify that include and exclude arguments are valid
330    if include_all and len(include) > 0:
331        raise click.BadParameter(
332            "Error: Cannot use --include-all and --include together.",
333        )
334    if include_all:
335        include = (".*",)
336
337    # Verify that issue_severity argument is valid
338    if issue_severity not in LOG_LEVELS:
339        raise click.BadParameter(
340            f"Error: {issue_severity} is not a valid issue severity or log level.",
341        )
342
343    checkers = get_checkers(
344        include,
345        exclude,
346        issue_severity=getattr(logging, issue_severity),
347    )
348
349    if single_rule is not None:
350        __main_single_rule(out, single_rule, checkers)
351
352        # Return here so no rules file is processed.
353        _at_exit()
354        return
355
356    # Check if the rules argument is valid and find the rules file
357    rules = find_rules_file(rules)
358
359    output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
360
361    write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
362
363    _at_exit()
364
365
[docs] 366def get_ini_kwargs(path: Optional[str]) -> dict[str, Any]: 367 """Read configuration from INI file based on CLI_ARGUMENTS structure.""" 368 ini_kwargs: dict[str, Any] = {} 369 if path is not None and not os.path.exists(path): 370 raise click.BadParameter( 371 f"Error: INI file provided in {path} but no options loaded", 372 ) 373 374 # Use the default path if no path was provided 375 if path is None: 376 path = "suricata-check.ini" 377 if not os.path.exists(path): 378 return {} 379 380 config_parser = configparser.ConfigParser( 381 empty_lines_in_values=False, 382 default_section="suricata-check", 383 converters={"tuple": lambda x: tuple(json.loads(x))}, 384 ) 385 config_parser.read(path) 386 387 # Process each argument defined in CLI_ARGUMENTS 388 for arg_name, arg_props in CLI_ARGUMENTS.items(): 389 ini_key = arg_name.replace("_", "-") 390 if not config_parser.has_option("suricata-check", ini_key): 391 continue 392 393 # Get the value based on the argument type 394 if arg_props["type"] is bool: 395 ini_kwargs[arg_name] = config_parser.getboolean("suricata-check", ini_key) 396 elif arg_props["type"] is tuple: 397 ini_kwargs[arg_name] = config_parser.gettuple("suricata-check", ini_key) # type: ignore reportAttributeAccessIssue 398 else: 399 ini_kwargs[arg_name] = config_parser.get("suricata-check", ini_key) 400 if arg_props["type"] is str: 401 ini_kwargs[arg_name] = ini_kwargs[arg_name].strip('"') 402 403 return ini_kwargs
404 405 406def __get_verified_kwarg( 407 kwargss: Sequence[dict[str, Any]], 408 name: str, 409) -> CLI_ARGUMENT_TYPE: 410 for kwargs in kwargss: 411 if name in kwargs: 412 if kwargs[name] is None: 413 if ( 414 not CLI_ARGUMENTS[name]["required"] 415 and CLI_ARGUMENTS[name]["default"] is not None 416 ): 417 return None 418 return CLI_ARGUMENTS[name]["default"] 419 420 if kwargs[name] is not CLI_ARGUMENTS[name]["default"]: 421 if not isinstance(kwargs[name], CLI_ARGUMENTS[name]["type"]): 422 raise click.BadParameter( 423 f"""Error: \ 424 Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` \ 425 but has value {kwargs[name]} of type {kwargs[name].__class__} instead.""", 426 ) 427 return kwargs[name] 428 429 return CLI_ARGUMENTS[name]["default"] 430 431 432def __main_single_rule( 433 out: str, 434 single_rule: str, 435 checkers: Optional[Sequence[CheckerInterface]], 436) -> None: 437 rule: Optional[Rule] = parse(single_rule) 438 439 # Verify that a rule was parsed correctly. 440 if rule is None: 441 msg = f"Error parsing rule from user input: {single_rule}" 442 _logger.critical(msg) 443 raise click.BadParameter(f"Error: {msg}") 444 445 if not is_valid_rule(rule): 446 msg = f"Error parsing rule from user input: {single_rule}" 447 _logger.critical(msg) 448 raise click.BadParameter(f"Error: {msg}") 449 450 _logger.debug("Processing rule: %s", get_rule_option(rule, "sid")) 451 452 rule_report = analyze_rule(rule, checkers=checkers) 453 454 write_output(OutputReport(rules=[rule_report]), out) 455 456
[docs] 457def process_rules_file( # noqa: C901, PLR0912, PLR0915 458 rules: str, 459 evaluate_disabled: bool, 460 checkers: Optional[Sequence[CheckerInterface]] = None, 461) -> OutputReport: 462 """Processes a rule file and returns a list of rules and their issues. 463 464 Args: 465 rules: A path to a Suricata rules file. 466 evaluate_disabled: A flag indicating whether disabled rules should be evaluated. 467 checkers: The checkers to be used when processing the rule file. 468 469 Returns: 470 A list of rules and their issues. 471 472 Raises: 473 RuntimeError: If no checkers could be automatically discovered. 474 475 """ 476 if checkers is None: 477 checkers = get_checkers() 478 479 output = OutputReport() 480 481 with ( 482 open( 483 os.path.normpath(rules), 484 buffering=io.DEFAULT_BUFFER_SIZE, 485 ) as rules_fh, 486 ): 487 if len(checkers) == 0: 488 msg = "No checkers provided for processing rules." 489 _logger.error(msg) 490 raise RuntimeError(msg) 491 492 _logger.info("Processing rule file: %s", rules) 493 494 collected_multiline_parts: Optional[str] = None 495 multiline_begin_number: Optional[int] = None 496 497 for number, line in enumerate(rules_fh.readlines(), start=1): 498 # First work on collecting and parsing multiline rules 499 if line.rstrip("\r\n").endswith("\\"): 500 multiline_part = line.rstrip("\r\n")[:-1] 501 502 if collected_multiline_parts is None: 503 collected_multiline_parts = multiline_part 504 multiline_begin_number = number 505 else: 506 collected_multiline_parts += multiline_part.lstrip() 507 508 continue 509 510 # Process final part of multiline rule if one is being collected 511 if collected_multiline_parts is not None: 512 collected_multiline_parts += line.lstrip() 513 514 rule_line = collected_multiline_parts.strip() 515 516 collected_multiline_parts = None 517 # If no multiline rule is being collected process as a potential single line rule 518 else: 519 if len(line.strip()) == 0: 520 continue 521 522 if line.strip().startswith("#"): 523 if evaluate_disabled: 524 # Verify that this line is a rule and not a comment 525 if parse(line) is None: 526 # Log the comment since it may be a invalid rule 527 _logger.warning( 528 "Ignoring comment on line %i: %s", 529 number, 530 line, 531 ) 532 continue 533 else: 534 # Skip the rule 535 continue 536 537 rule_line = line.strip() 538 539 try: 540 rule: Optional[Rule] = parse(rule_line) 541 except ParsingError: 542 _logger.error( 543 "Internal error in parsing of rule on line %i: %s", 544 number, 545 rule_line, 546 ) 547 rule = None 548 549 # Parse comment and potential ignore comment to ignore rules 550 ignore = __parse_type_ignore(rule) 551 552 # Verify that a rule was parsed correctly. 553 if rule is None: 554 _logger.error("Error parsing rule on line %i: %s", number, rule_line) 555 continue 556 557 if not is_valid_rule(rule): 558 _logger.error("Invalid rule on line %i: %s", number, rule_line) 559 continue 560 561 _logger.debug( 562 "Processing rule: %s on line %i", 563 get_rule_option(rule, "sid"), 564 number, 565 ) 566 567 rule_report: RuleReport = analyze_rule( 568 rule, 569 checkers=checkers, 570 ignore=ignore, 571 ) 572 rule_report.line_begin = multiline_begin_number or number 573 rule_report.line_end = number 574 575 output.rules.append(rule_report) 576 577 multiline_begin_number = None 578 579 _logger.info("Completed processing rule file: %s", rules) 580 581 output.summary = summarize_output(output, checkers) 582 583 return output
584 585 586def __parse_type_ignore(rule: Optional[Rule]) -> Optional[Sequence[str]]: 587 if rule is None: 588 return None 589 590 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check") 591 if ignore_value is None: 592 return [] 593 594 return ignore_value.strip(' "').split(",") 595 596
[docs] 597def analyze_rule( 598 rule: Rule, 599 checkers: Optional[Sequence[CheckerInterface]] = None, 600 ignore: Optional[Sequence[str]] = None, 601) -> RuleReport: 602 """Checks a rule and returns a dictionary containing the rule and a list of issues found. 603 604 Args: 605 rule: The rule to be checked. 606 checkers: The checkers to be used to check the rule. 607 ignore: Regular expressions to match checker codes to ignore 608 609 Returns: 610 A list of issues found in the rule. 611 Each issue is typed as a `dict`. 612 613 Raises: 614 InvalidRuleError: If the rule does not follow the Suricata syntax. 615 616 """ 617 if not is_valid_rule(rule): 618 raise InvalidRuleError(rule.raw) 619 620 check_rule_option_recognition(rule) 621 622 if checkers is None: 623 checkers = get_checkers() 624 625 rule_report: RuleReport = RuleReport(rule=rule) 626 627 compiled_ignore = ( 628 [_regex_provider.compile(r) for r in ignore] if ignore is not None else [] 629 ) 630 631 for checker in checkers: 632 try: 633 issues = checker.check_rule(rule) 634 for r in compiled_ignore: 635 issues = list(filter(lambda issue: r.match(issue.code) is None, issues)) 636 rule_report.add_issues(issues) 637 except Exception as exception: # noqa: BLE001 638 _logger.warning( 639 "Failed to run %s on rule: %s", 640 checker.__class__.__name__, 641 rule.raw, 642 extra={"exception": exception}, 643 ) 644 645 rule_report.summary = summarize_rule(rule_report, checkers) 646 647 return rule_report
648 649 650if __name__ == "__main__": 651 main()