Source code for suricata_check.suricata_check

   1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
   2
   3import atexit
   4import configparser
   5import io
   6import json
   7import logging
   8import logging.handlers
   9import multiprocessing
  10import os
  11import pkgutil
  12import sys
  13from collections import defaultdict
  14from collections.abc import Mapping, Sequence
  15from functools import lru_cache
  16from typing import (
  17    Any,
  18    Literal,
  19    Optional,
  20    TypeVar,
  21    Union,
  22    overload,
  23)
  24
  25import click
  26import idstools.rule
  27import tabulate
  28
  29# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
  30_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  31if sys.path[0] != _suricata_check_path:
  32    sys.path.insert(0, _suricata_check_path)
  33
  34from suricata_check import (  # noqa: E402
  35    __version__,
  36    check_for_update,
  37    get_dependency_versions,
  38)
  39from suricata_check.checkers.interface import CheckerInterface  # noqa: E402
  40from suricata_check.checkers.interface.dummy import DummyChecker  # noqa: E402
  41from suricata_check.utils._click import ClickHandler, ClickHelpOption  # noqa: E402
  42from suricata_check.utils._path import find_rules_file  # noqa: E402
  43from suricata_check.utils.checker import (  # noqa: E402
  44    check_rule_option_recognition,
  45    get_rule_suboption,
  46)
  47from suricata_check.utils.checker_typing import (  # noqa: E402
  48    EXTENSIVE_SUMMARY_TYPE,
  49    ISSUES_TYPE,
  50    RULE_REPORTS_TYPE,
  51    RULE_SUMMARY_TYPE,
  52    SIMPLE_SUMMARY_TYPE,
  53    InvalidRuleError,
  54    OutputReport,
  55    OutputSummary,
  56    RuleReport,
  57    get_all_subclasses,
  58)
  59from suricata_check.utils.regex import get_regex_provider, is_valid_rule  # noqa: E402
  60
  61LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
  62LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
  63GITLAB_SEVERITIES = {
  64    logging.DEBUG: "info",
  65    logging.INFO: "info",
  66    logging.WARNING: "minor",
  67    logging.ERROR: "major",
  68    logging.CRITICAL: "critical",
  69}
  70GITHUB_SEVERITIES = {
  71    logging.DEBUG: "debug",
  72    logging.INFO: "notice",
  73    logging.WARNING: "warning",
  74    logging.ERROR: "error",
  75    logging.CRITICAL: "error",
  76}
  77GITHUB_COMMAND = (
  78    "::{level} file={file},line={line},endLine={end_line},title={title}::{message}"
  79)
  80
  81_logger = logging.getLogger(__name__)
  82
  83_regex_provider = get_regex_provider()
  84
  85# Global variable to check if extensions have already been imported in case get_checkers() is called multiple times.
  86suricata_check_extensions_imported = False
  87
  88
  89@click.command()
  90@click.option(
  91    "--ini",
  92    "-i",
  93    help="Path to suricata-check.ini file to read configuration from.",
  94    show_default=True,
  95)
  96@click.option(
  97    "--rules",
  98    "-r",
  99    help="Path to Suricata rules to provide check on.",
 100    show_default=True,
 101)
 102@click.option(
 103    "--single-rule",
 104    "-s",
 105    help="A single Suricata rule to be checked",
 106    show_default=False,
 107)
 108@click.option(
 109    "--out",
 110    "-o",
 111    help="Path to suricata-check output folder.",
 112    show_default=True,
 113)
 114@click.option(
 115    "--log-level",
 116    help=f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
 117    show_default=True,
 118)
 119@click.option(
 120    "--gitlab",
 121    help="Flag to create CodeClimate output report for GitLab CI/CD.",
 122    show_default=True,
 123    is_flag=True,
 124)
 125@click.option(
 126    "--github",
 127    help="Flag to write workflow commands to stdout for GitHub CI/CD.",
 128    show_default=True,
 129    is_flag=True,
 130)
 131@click.option(
 132    "--evaluate-disabled",
 133    help="Flag to evaluate disabled rules.",
 134    show_default=True,
 135    is_flag=True,
 136)
 137@click.option(
 138    "--issue-severity",
 139    help=f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
 140    show_default=True,
 141)
 142@click.option(
 143    "--include-all",
 144    "-a",
 145    help="Flag to indicate all checker codes should be enabled.",
 146    show_default=True,
 147    is_flag=True,
 148)
 149@click.option(
 150    "--include",
 151    "-i",
 152    help="List of all checker codes to enable.",
 153    show_default=True,
 154    multiple=True,
 155)
 156@click.option(
 157    "--exclude",
 158    "-e",
 159    help="List of all checker codes to disable.",
 160    show_default=True,
 161    multiple=True,
 162)
 163@click.help_option("-h", "--help", cls=ClickHelpOption)
 164def main(  # noqa: PLR0915
 165    **kwargs: dict[str, Any],
 166) -> None:
 167    """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
 168
 169    Raises:
 170      BadParameter: If provided arguments are invalid.
 171
 172      RuntimeError: If no checkers could be automatically discovered.
 173
 174    """
 175    # Look for a ini file and parse it.
 176    ini_kwargs = __get_ini_kwargs(
 177        str(kwargs["ini"]) if kwargs["ini"] is not None else None  # type: ignore reportUnnecessaryComparison
 178    )
 179
 180    # Verify CLI argument types and get CLI arguments or use default arguments
 181    rules: str = __get_verified_kwarg([kwargs, ini_kwargs], "rules", str, False, ".")
 182    single_rule: Optional[str] = __get_verified_kwarg(
 183        [kwargs, ini_kwargs], "single_rule", str, True, None
 184    )
 185    out: str = __get_verified_kwarg([kwargs, ini_kwargs], "out", str, False, ".")
 186    log_level: LogLevel = __get_verified_kwarg(
 187        [kwargs, ini_kwargs], "log_level", str, False, "DEBUG"
 188    )
 189    gitlab: bool = __get_verified_kwarg(
 190        [kwargs, ini_kwargs], "gitlab", bool, False, False
 191    )
 192    github: bool = __get_verified_kwarg(
 193        [kwargs, ini_kwargs], "github", bool, False, False
 194    )
 195    evaluate_disabled: bool = __get_verified_kwarg(
 196        [kwargs, ini_kwargs], "evaluate_disabled", bool, False, False
 197    )
 198    issue_severity: LogLevel = __get_verified_kwarg(
 199        [kwargs, ini_kwargs], "issue_severity", str, False, "INFO"
 200    )
 201    include_all: bool = __get_verified_kwarg(
 202        [kwargs, ini_kwargs], "include_all", bool, False, False
 203    )
 204    include: tuple[str, ...] = __get_verified_kwarg(
 205        [kwargs, ini_kwargs], "include", tuple, False, ()
 206    )
 207    exclude: tuple[str, ...] = __get_verified_kwarg(
 208        [kwargs, ini_kwargs], "exclude", tuple, False, ()
 209    )
 210
 211    # Verify that out argument is valid
 212    if os.path.exists(out) and not os.path.isdir(out):
 213        raise click.BadParameter(f"Error: {out} is not a directory.")
 214
 215    # Verify that log_level argument is valid
 216    if log_level not in LOG_LEVELS:
 217        raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
 218
 219    # Create out directory if non-existent
 220    if not os.path.exists(out):
 221        os.makedirs(out)
 222
 223    # Setup logging from a seperate thread
 224    queue = multiprocessing.Manager().Queue()
 225    queue_handler = logging.handlers.QueueHandler(queue)
 226
 227    click_handler = ClickHandler(
 228        github=github, github_level=getattr(logging, log_level)
 229    )
 230    logging.basicConfig(
 231        level=log_level,
 232        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 233        handlers=(queue_handler, click_handler),
 234        force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", "FALSE") == "TRUE",
 235    )
 236
 237    file_handler = logging.FileHandler(
 238        filename=os.path.join(out, "suricata-check.log"),
 239        delay=True,
 240    )
 241    queue_listener = logging.handlers.QueueListener(
 242        queue,
 243        file_handler,
 244        respect_handler_level=True,
 245    )
 246
 247    def _at_exit() -> None:
 248        """Cleans up logging listener and handlers before exiting."""
 249        queue_listener.enqueue_sentinel()
 250        queue_listener.stop()
 251        file_handler.flush()
 252        file_handler.close()
 253        atexit.unregister(_at_exit)
 254
 255    atexit.register(_at_exit)
 256
 257    queue_listener.start()
 258
 259    # Log the arguments:
 260    _logger.info("Running suricata-check with the following arguments:")
 261    _logger.info("out: %s", out)
 262    _logger.info("rules: %s", rules)
 263    _logger.info("single_rule: %s", single_rule)
 264    _logger.info("log_level: %s", log_level)
 265    _logger.info("gitlab: %s", gitlab)
 266    _logger.info("github: %s", github)
 267    _logger.info("evaluate_disabled: %s", evaluate_disabled)
 268    _logger.info("issue_severity: %s", issue_severity)
 269    _logger.info("include_all: %s", include_all)
 270    _logger.info("include: %s", include)
 271    _logger.info("exclude: %s", exclude)
 272
 273    # Log the environment:
 274    _logger.debug("Platform: %s", sys.platform)
 275    _logger.debug("Python version: %s", sys.version)
 276    _logger.debug("suricata-check path: %s", _suricata_check_path)
 277    _logger.debug("suricata-check version: %s", __version__)
 278    for package, version in get_dependency_versions().items():
 279        _logger.debug("Dependency %s version: %s", package, version)
 280
 281    check_for_update()
 282
 283    # Verify that include and exclude arguments are valid
 284    if include_all and len(include) > 0:
 285        raise click.BadParameter(
 286            "Error: Cannot use --include-all and --include together."
 287        )
 288    if include_all:
 289        include = (".*",)
 290
 291    # Verify that issue_severity argument is valid
 292    if issue_severity not in LOG_LEVELS:
 293        raise click.BadParameter(
 294            f"Error: {issue_severity} is not a valid issue severity or log level."
 295        )
 296
 297    checkers = get_checkers(
 298        include, exclude, issue_severity=getattr(logging, issue_severity)
 299    )
 300
 301    if single_rule is not None:
 302        __main_single_rule(out, single_rule, checkers)
 303
 304        # Return here so no rules file is processed.
 305        _at_exit()
 306        return
 307
 308    # Check if the rules argument is valid and find the rules file
 309    rules = find_rules_file(rules)
 310
 311    output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
 312
 313    __write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
 314
 315    _at_exit()
 316
 317
 318def __get_ini_kwargs(path: Optional[str]) -> dict[str, Any]:  # noqa: C901, PLR0912
 319    ini_kwargs: dict[str, Any] = {}
 320    if path is not None:
 321        if not os.path.exists(path):
 322            raise click.BadParameter(
 323                f"Error: INI file provided in {path} but no options loaded"
 324            )
 325
 326    # Use the default path if no path was provided
 327    if path is None:
 328        path = "suricata-check.ini"
 329        if not os.path.exists(path):
 330            return {}
 331
 332    config_parser = configparser.ConfigParser(
 333        empty_lines_in_values=False,
 334        default_section="suricata-check",
 335        converters={"tuple": lambda x: tuple(json.loads(x))},
 336    )
 337    config_parser.read(path)
 338    ini_kwargs = {}
 339
 340    if config_parser.has_option("suricata-check", "rules"):
 341        ini_kwargs["rules"] = config_parser.get("suricata-check", "rules")
 342    if config_parser.has_option("suricata-check", "out"):
 343        ini_kwargs["out"] = config_parser.get("suricata-check", "out")
 344    if config_parser.has_option("suricata-check", "log"):
 345        ini_kwargs["log"] = config_parser.get("suricata-check", "log")
 346    if config_parser.has_option("suricata-check", "gitlab"):
 347        ini_kwargs["gitlab"] = config_parser.getboolean("suricata-check", "gitlab")
 348    if config_parser.has_option("suricata-check", "github"):
 349        ini_kwargs["github"] = config_parser.getboolean("suricata-check", "github")
 350    if config_parser.has_option("suricata-check", "evaluate_disabled"):
 351        ini_kwargs["evaluate_disabled"] = config_parser.getboolean(
 352            "suricata-check", "evaluate_disabled"
 353        )
 354    if config_parser.has_option("suricata-check", "issue-severity"):
 355        ini_kwargs["issue_severity"] = config_parser.get(
 356            "suricata-check", "issue-severity"
 357        )
 358    if config_parser.has_option("suricata-check", "include-all"):
 359        ini_kwargs["include_all"] = config_parser.getboolean(
 360            "suricata-check", "include-all"
 361        )
 362    if config_parser.has_option("suricata-check", "include"):
 363        ini_kwargs["include"] = config_parser.gettuple("suricata-check", "include")  # type: ignore reportAttributeAccessIssue
 364    if config_parser.has_option("suricata-check", "exclude"):
 365        ini_kwargs["exclude"] = config_parser.gettuple("suricata-check", "exclude")  # type: ignore reportAttributeAccessIssue
 366
 367    return ini_kwargs
 368
 369
 370D = TypeVar("D")
 371
 372
 373@overload
 374def __get_verified_kwarg(
 375    kwargss: Sequence[dict[str, Any]],
 376    name: str,
 377    expected_type: type,
 378    optional: Literal[True],
 379    default: D,
 380) -> Optional[D]:
 381    pass
 382
 383
 384@overload
 385def __get_verified_kwarg(
 386    kwargss: Sequence[dict[str, Any]],
 387    name: str,
 388    expected_type: type,
 389    optional: Literal[False],
 390    default: D,
 391) -> D:
 392    pass
 393
 394
 395def __get_verified_kwarg(
 396    kwargss: Sequence[dict[str, Any]],
 397    name: str,
 398    expected_type: type,
 399    optional: bool,
 400    default: D,
 401) -> Optional[D]:
 402    for kwargs in kwargss:
 403        if name in kwargs:
 404            if kwargs[name] is None:
 405                if optional and default is not None:
 406                    return None
 407                return default
 408
 409            if kwargs[name] is not default:
 410                if not isinstance(kwargs[name], expected_type):
 411                    raise click.BadParameter(
 412                        f"""Error: \
 413                Argument `{name}` should have a value of type `{expected_type}` \
 414                but has value {kwargs[name]} of type {kwargs[name].__class__} instead."""
 415                    )
 416                return kwargs[name]
 417
 418    return default
 419
 420
 421def __main_single_rule(
 422    out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]]
 423) -> None:
 424    rule: Optional[idstools.rule.Rule] = idstools.rule.parse(single_rule)
 425
 426    # Verify that a rule was parsed correctly.
 427    if rule is None:
 428        msg = f"Error parsing rule from user input: {single_rule}"
 429        _logger.critical(msg)
 430        raise click.BadParameter(f"Error: {msg}")
 431
 432    if not is_valid_rule(rule):
 433        msg = f"Error parsing rule from user input: {single_rule}"
 434        _logger.critical(msg)
 435        raise click.BadParameter(f"Error: {msg}")
 436
 437    _logger.debug("Processing rule: %s", rule["sid"])
 438
 439    rule_report = analyze_rule(rule, checkers=checkers)
 440
 441    __write_output(OutputReport(rules=[rule_report]), out)
 442
 443
 444def __write_output(
 445    output: OutputReport,
 446    out: str,
 447    gitlab: bool = False,
 448    github: bool = False,
 449    rules_file: Optional[str] = None,
 450) -> None:
 451    _logger.info(
 452        "Writing output to suricata-check.jsonl and suricata-check-fast.log in %s",
 453        os.path.abspath(out),
 454    )
 455    with (
 456        open(
 457            os.path.join(out, "suricata-check.jsonl"),
 458            "w",
 459            buffering=io.DEFAULT_BUFFER_SIZE,
 460        ) as jsonl_fh,
 461        open(
 462            os.path.join(out, "suricata-check-fast.log"),
 463            "w",
 464            buffering=io.DEFAULT_BUFFER_SIZE,
 465        ) as fast_fh,
 466    ):
 467        rules: RULE_REPORTS_TYPE = output.rules
 468        jsonl_fh.write("\n".join([str(rule) for rule in rules]))
 469
 470        for rule_report in rules:
 471            rule: idstools.rule.Rule = rule_report.rule
 472            lines: str = (
 473                "{}-{}".format(rule_report.line_begin, rule_report.line_end)
 474                if rule_report.line_begin
 475                else "Unknown"
 476            )
 477            issues: ISSUES_TYPE = rule_report.issues
 478            for issue in issues:
 479                code = issue.code
 480                severity = (
 481                    logging.getLevelName(issue.severity) if issue.severity else None
 482                )
 483                issue_msg = issue.message.replace("\n", " ")
 484
 485                msg = "[{}]{} Lines {}, sid {}: {}".format(
 486                    code,
 487                    f" ({severity})" if severity else "",
 488                    lines,
 489                    rule["sid"],
 490                    issue_msg,
 491                )
 492                fast_fh.write(msg + "\n")
 493                click.secho(msg, color=True, fg="blue")
 494
 495    if output.summary is not None:
 496        __write_output_stats(output, out)
 497
 498    if gitlab:
 499        assert rules_file is not None
 500
 501        __write_output_gitlab(output, out, rules_file)
 502
 503    if github:
 504        assert rules_file is not None
 505
 506        __write_output_github(output, rules_file)
 507
 508
 509def __write_output_stats(output: OutputReport, out: str) -> None:
 510    assert output.summary is not None
 511
 512    with open(
 513        os.path.join(out, "suricata-check-stats.log"),
 514        "w",
 515        buffering=io.DEFAULT_BUFFER_SIZE,
 516    ) as stats_fh:
 517        summary: OutputSummary = output.summary
 518
 519        overall_summary: SIMPLE_SUMMARY_TYPE = summary.overall_summary
 520
 521        n_issues = overall_summary["Total Issues"]
 522        n_rules = (
 523            overall_summary["Rules with Issues"]
 524            + overall_summary["Rules without Issues"]
 525        )
 526
 527        stats_fh.write(
 528            tabulate.tabulate(
 529                (
 530                    (
 531                        k,
 532                        v,
 533                        (
 534                            "{:.0%}".format(v / n_rules)
 535                            if k.startswith("Rules ") and n_rules > 0
 536                            else "-"
 537                        ),
 538                    )
 539                    for k, v in overall_summary.items()
 540                ),
 541                headers=(
 542                    "Count",
 543                    "Percentage of Rules",
 544                ),
 545            )
 546            + "\n\n",
 547        )
 548
 549        click.secho(
 550            f"Total issues found: {overall_summary['Total Issues']}",
 551            color=True,
 552            bold=True,
 553            fg="blue",
 554        )
 555        click.secho(
 556            f"Rules with Issues found: {overall_summary['Rules with Issues']}",
 557            color=True,
 558            bold=True,
 559            fg="blue",
 560        )
 561
 562        issues_by_group: SIMPLE_SUMMARY_TYPE = summary.issues_by_group
 563
 564        stats_fh.write(
 565            tabulate.tabulate(
 566                (
 567                    (k, v, "{:.0%}".format(v / n_issues) if n_issues > 0 else "-")
 568                    for k, v in issues_by_group.items()
 569                ),
 570                headers=(
 571                    "Count",
 572                    "Percentage of Total Issues",
 573                ),
 574            )
 575            + "\n\n",
 576        )
 577
 578        issues_by_type: EXTENSIVE_SUMMARY_TYPE = summary.issues_by_type
 579        for checker, checker_issues_by_type in issues_by_type.items():
 580            stats_fh.write(" " + checker + " " + "\n")
 581            stats_fh.write("-" * (len(checker) + 2) + "\n")
 582            stats_fh.write(
 583                tabulate.tabulate(
 584                    (
 585                        (
 586                            k,
 587                            v,
 588                            "{:.0%}".format(v / n_rules) if n_rules > 0 else "-",
 589                        )
 590                        for k, v in checker_issues_by_type.items()
 591                    ),
 592                    headers=(
 593                        "Count",
 594                        "Percentage of Rules",
 595                    ),
 596                )
 597                + "\n\n",
 598            )
 599
 600
 601def __write_output_gitlab(output: OutputReport, out: str, rules_file: str) -> None:
 602    with open(
 603        os.path.join(out, "suricata-check-gitlab.json"),
 604        "w",
 605        buffering=io.DEFAULT_BUFFER_SIZE,
 606    ) as gitlab_fh:
 607        issue_dicts = []
 608        for rule_report in output.rules:
 609            line_begin: Optional[int] = rule_report.line_begin
 610            assert line_begin is not None
 611            line_end: Optional[int] = rule_report.line_end
 612            assert line_end is not None
 613            issues: ISSUES_TYPE = rule_report.issues
 614            for issue in issues:
 615                code = issue.code
 616                issue_msg = issue.message.replace("\n", " ")
 617                assert issue.checker is not None
 618                issue_checker = issue.checker
 619                issue_hash = str(issue.hash)
 620                assert issue.severity is not None
 621                issue_severity = GITLAB_SEVERITIES[issue.severity]
 622
 623                issue_dict: Mapping[
 624                    str,
 625                    Union[str, list[str], Mapping[str, Union[str, Mapping[str, int]]]],
 626                ] = {
 627                    "description": issue_msg,
 628                    "categories": [issue_checker],
 629                    "check_name": f"Suricata Check {code}",
 630                    "fingerprint": issue_hash,
 631                    "severity": issue_severity,
 632                    "location": {
 633                        "path": rules_file,
 634                        "lines": {"begin": line_begin, "end": line_end},
 635                    },
 636                }
 637                issue_dicts.append(issue_dict)
 638
 639        gitlab_fh.write(json.dumps(issue_dicts))
 640
 641
 642def __write_output_github(output: OutputReport, rules_file: str) -> None:
 643    output_lines: dict[str, list[str]] = {
 644        k: [] for k in set(GITHUB_SEVERITIES.values())
 645    }
 646    for rule_report in output.rules:
 647        line_begin: Optional[int] = rule_report.line_begin
 648        assert line_begin is not None
 649        line_end: Optional[int] = rule_report.line_end
 650        assert line_end is not None
 651        issues: ISSUES_TYPE = rule_report.issues
 652        for issue in issues:
 653            code = issue.code
 654            issue_msg = issue.message.replace("\n", " ")
 655            assert issue.checker is not None
 656            issue_checker = issue.checker
 657            assert issue.severity is not None
 658            issue_severity = GITHUB_SEVERITIES[issue.severity]
 659            title = f"{issue_checker} - {code}"
 660
 661            output_lines[issue_severity].append(
 662                GITHUB_COMMAND.format(
 663                    level=issue_severity,
 664                    file=rules_file,
 665                    line=line_begin,
 666                    end_line=line_end,
 667                    title=title,
 668                    message=issue_msg,
 669                )
 670            )
 671
 672    for message_level, lines in output_lines.items():
 673        if len(lines) > 0:
 674            print(f"::group::{message_level}")  # noqa: T201
 675            for message in lines:
 676                print(message)  # noqa: T201
 677            print("::endgroup::")  # noqa: T201
 678
 679
[docs] 680def process_rules_file( # noqa: C901, PLR0912, PLR0915 681 rules: str, 682 evaluate_disabled: bool, 683 checkers: Optional[Sequence[CheckerInterface]] = None, 684) -> OutputReport: 685 """Processes a rule file and returns a list of rules and their issues. 686 687 Args: 688 rules: A path to a Suricata rules file. 689 evaluate_disabled: A flag indicating whether disabled rules should be evaluated. 690 checkers: The checkers to be used when processing the rule file. 691 692 Returns: 693 A list of rules and their issues. 694 695 Raises: 696 RuntimeError: If no checkers could be automatically discovered. 697 698 """ 699 if checkers is None: 700 checkers = get_checkers() 701 702 output = OutputReport() 703 704 with ( 705 open( 706 os.path.normpath(rules), 707 buffering=io.DEFAULT_BUFFER_SIZE, 708 ) as rules_fh, 709 ): 710 if len(checkers) == 0: 711 msg = "No checkers provided for processing rules." 712 _logger.error(msg) 713 raise RuntimeError(msg) 714 715 _logger.info("Processing rule file: %s", rules) 716 717 collected_multiline_parts: Optional[str] = None 718 multiline_begin_number: Optional[int] = None 719 720 for number, line in enumerate(rules_fh.readlines(), start=1): 721 # First work on collecting and parsing multiline rules 722 if line.rstrip("\r\n").endswith("\\"): 723 multiline_part = line.rstrip("\r\n")[:-1] 724 725 if collected_multiline_parts is None: 726 collected_multiline_parts = multiline_part 727 multiline_begin_number = number 728 else: 729 collected_multiline_parts += multiline_part.lstrip() 730 731 continue 732 733 # Process final part of multiline rule if one is being collected 734 if collected_multiline_parts is not None: 735 collected_multiline_parts += line.lstrip() 736 737 rule_line = collected_multiline_parts.strip() 738 739 collected_multiline_parts = None 740 # If no multiline rule is being collected process as a potential single line rule 741 else: 742 if len(line.strip()) == 0: 743 continue 744 745 if line.strip().startswith("#"): 746 if evaluate_disabled: 747 # Verify that this line is a rule and not a comment 748 if idstools.rule.parse(line) is None: 749 # Log the comment since it may be a invalid rule 750 _logger.warning( 751 "Ignoring comment on line %i: %s", number, line 752 ) 753 continue 754 else: 755 # Skip the rule 756 continue 757 758 rule_line = line.strip() 759 760 try: 761 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(rule_line) 762 except Exception: # noqa: BLE001 763 _logger.error( 764 "Internal error in idstools parsing rule on line %i: %s", 765 number, 766 rule_line, 767 ) 768 rule = None 769 770 # Parse comment and potential ignore comment to ignore rules 771 ignore = __parse_type_ignore(rule) 772 773 # Verify that a rule was parsed correctly. 774 if rule is None: 775 _logger.error("Error parsing rule on line %i: %s", number, rule_line) 776 continue 777 778 if not is_valid_rule(rule): 779 _logger.error("Invalid rule on line %i: %s", number, rule_line) 780 continue 781 782 _logger.debug("Processing rule: %s on line %i", rule["sid"], number) 783 784 rule_report: RuleReport = analyze_rule( 785 rule, 786 checkers=checkers, 787 ignore=ignore, 788 ) 789 rule_report.line_begin = multiline_begin_number or number 790 rule_report.line_end = number 791 792 output.rules.append(rule_report) 793 794 multiline_begin_number = None 795 796 _logger.info("Completed processing rule file: %s", rules) 797 798 output.summary = __summarize_output(output, checkers) 799 800 return output
801 802 803def __is_valid_idstools_rule(text: str) -> bool: 804 try: 805 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(text) 806 except Exception: # noqa: BLE001 807 return False 808 809 if rule is None: 810 return False 811 812 return True 813 814 815def __parse_type_ignore(rule: Optional[idstools.rule.Rule]) -> Optional[Sequence[str]]: 816 if rule is None: 817 return None 818 819 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check") 820 if ignore_value is None: 821 return [] 822 823 return ignore_value.strip(' "').split(",") 824 825 826def _import_extensions() -> None: 827 global suricata_check_extensions_imported # noqa: PLW0603 828 if suricata_check_extensions_imported is True: 829 return 830 831 for module in pkgutil.iter_modules(): 832 if module.name.startswith("suricata_check_"): 833 try: 834 imported_module = __import__(module.name) 835 _logger.info( 836 "Detected and successfully imported suricata-check extension %s with version %s.", 837 module.name.replace("_", "-"), 838 getattr(imported_module, "__version__"), 839 ) 840 except ImportError: 841 _logger.warning( 842 "Detected potential suricata-check extension %s but failed to import it.", 843 module.name.replace("_", "-"), 844 ) 845 suricata_check_extensions_imported = True 846 847
[docs] 848@lru_cache(maxsize=1) 849def get_checkers( 850 include: Sequence[str] = (".*",), 851 exclude: Sequence[str] = (), 852 issue_severity: int = logging.INFO, 853) -> Sequence[CheckerInterface]: 854 """Auto discovers all available checkers that implement the CheckerInterface. 855 856 Returns: 857 A list of available checkers that implement the CheckerInterface. 858 859 """ 860 # Check for extensions and try to import them 861 _import_extensions() 862 863 checkers: list[CheckerInterface] = [] 864 for checker in get_all_subclasses(CheckerInterface): 865 if checker.__name__ == DummyChecker.__name__: 866 continue 867 868 # Initialize DummyCheckers to retrieve error messages. 869 if issubclass(checker, DummyChecker): 870 checker() 871 872 enabled, relevant_codes = __get_checker_enabled( 873 checker, include, exclude, issue_severity 874 ) 875 876 if enabled: 877 checkers.append(checker(include=relevant_codes)) 878 879 else: 880 _logger.info("Checker %s is disabled.", checker.__name__) 881 882 _logger.info( 883 "Discovered and enabled checkers: [%s]", 884 ", ".join([c.__class__.__name__ for c in checkers]), 885 ) 886 if len(checkers) == 0: 887 _logger.warning( 888 "No checkers were enabled. Check the include and exclude arguments." 889 ) 890 891 # Perform a uniqueness check on the codes emmitted by the checkers 892 for checker1 in checkers: 893 for checker2 in checkers: 894 if checker1 == checker2: 895 continue 896 if not set(checker1.codes).isdisjoint(checker2.codes): 897 msg = f"Checker {checker1.__class__.__name__} and {checker2.__class__.__name__} have overlapping codes." 898 _logger.error(msg) 899 900 return sorted(checkers, key=lambda x: x.__class__.__name__)
901 902 903def __get_checker_enabled( 904 checker: type[CheckerInterface], 905 include: Sequence[str], 906 exclude: Sequence[str], 907 issue_severity: int, 908) -> tuple[bool, set[str]]: 909 enabled = checker.enabled_by_default 910 911 # If no include regexes are provided, include all by default 912 if len(include) == 0: 913 relevant_codes = set(checker.codes.keys()) 914 else: 915 # If include regexes are provided, include all codes that match any of these regexes 916 relevant_codes = set() 917 918 for regex in include: 919 relevant_codes.update( 920 set( 921 filter( 922 lambda code: _regex_provider.compile("^" + regex + "$").match( 923 code 924 ) 925 is not None, 926 checker.codes.keys(), 927 ) 928 ) 929 ) 930 931 if len(relevant_codes) > 0: 932 enabled = True 933 934 # Now remove the codes that are excluded according to any of the provided exclude regexes 935 for regex in exclude: 936 relevant_codes = set( 937 filter( 938 lambda code: _regex_provider.compile("^" + regex + "$").match(code) 939 is None, 940 relevant_codes, 941 ) 942 ) 943 944 # Now filter out irrelevant codes based on severity 945 relevant_codes = set( 946 filter( 947 lambda code: checker.codes[code]["severity"] >= issue_severity, 948 relevant_codes, 949 ) 950 ) 951 952 if len(relevant_codes) == 0: 953 enabled = False 954 955 return enabled, relevant_codes 956 957
[docs] 958def analyze_rule( 959 rule: idstools.rule.Rule, 960 checkers: Optional[Sequence[CheckerInterface]] = None, 961 ignore: Optional[Sequence[str]] = None, 962) -> RuleReport: 963 """Checks a rule and returns a dictionary containing the rule and a list of issues found. 964 965 Args: 966 rule: The rule to be checked. 967 checkers: The checkers to be used to check the rule. 968 ignore: Regular expressions to match checker codes to ignore 969 970 Returns: 971 A list of issues found in the rule. 972 Each issue is typed as a `dict`. 973 974 Raises: 975 InvalidRuleError: If the rule does not follow the Suricata syntax. 976 977 """ 978 if not is_valid_rule(rule): 979 raise InvalidRuleError(rule["raw"]) 980 981 check_rule_option_recognition(rule) 982 983 if checkers is None: 984 checkers = get_checkers() 985 986 rule_report: RuleReport = RuleReport(rule=rule) 987 988 _logger.warning(ignore) 989 990 compiled_ignore = ( 991 [_regex_provider.compile(r) for r in ignore] if ignore is not None else [] 992 ) 993 994 for checker in checkers: 995 try: 996 issues = checker.check_rule(rule) 997 for r in compiled_ignore: 998 issues = list(filter(lambda issue: r.match(issue.code) is None, issues)) 999 rule_report.add_issues(issues) 1000 except Exception as exception: # noqa: BLE001 1001 _logger.warning( 1002 "Failed to run %s on rule: %s", 1003 checker.__class__.__name__, 1004 rule["raw"], 1005 extra={"exception": exception}, 1006 ) 1007 1008 rule_report.summary = __summarize_rule(rule_report, checkers) 1009 1010 return rule_report
1011 1012 1013def __summarize_rule( 1014 rule: RuleReport, 1015 checkers: Optional[Sequence[CheckerInterface]] = None, 1016) -> RULE_SUMMARY_TYPE: 1017 """Summarizes the issues found in a rule. 1018 1019 Args: 1020 rule: The rule output dictionary to be summarized. 1021 checkers: The checkers to be used to check the rule. 1022 1023 Returns: 1024 A dictionary containing a summary of all issues found in the rule. 1025 1026 """ 1027 if checkers is None: 1028 checkers = get_checkers() 1029 1030 summary = {} 1031 1032 issues: ISSUES_TYPE = rule.issues 1033 summary["total_issues"] = len(issues) 1034 summary["issues_by_group"] = defaultdict(int) 1035 for issue in issues: 1036 checker = issue.checker 1037 summary["issues_by_group"][checker] += 1 1038 1039 # Ensure also checkers without issues are included in the report. 1040 for checker in checkers: 1041 if checker.__class__.__name__ not in summary["issues_by_group"]: 1042 summary["issues_by_group"][checker.__class__.__name__] = 0 1043 1044 # Sort dictionaries for deterministic output 1045 summary["issues_by_group"] = __sort_mapping(summary["issues_by_group"]) 1046 1047 return summary 1048 1049 1050def __summarize_output( 1051 output: OutputReport, 1052 checkers: Optional[Sequence[CheckerInterface]] = None, 1053) -> OutputSummary: 1054 """Summarizes the issues found in a rules file. 1055 1056 Args: 1057 output: The unsammarized output of the rules file containing all rules and their issues. 1058 checkers: The checkers to be used to check the rule. 1059 1060 Returns: 1061 A dictionary containing a summary of all issues found in the rules file. 1062 1063 """ 1064 if checkers is None: 1065 checkers = get_checkers() 1066 1067 return OutputSummary( 1068 overall_summary=__get_overall_summary(output), 1069 issues_by_group=__get_issues_by_group(output, checkers), 1070 issues_by_type=__get_issues_by_type(output, checkers), 1071 ) 1072 1073 1074def __get_overall_summary( 1075 output: OutputReport, 1076) -> SIMPLE_SUMMARY_TYPE: 1077 overall_summary = { 1078 "Total Issues": 0, 1079 "Rules with Issues": 0, 1080 "Rules without Issues": 0, 1081 } 1082 1083 rules: RULE_REPORTS_TYPE = output.rules 1084 for rule in rules: 1085 issues: ISSUES_TYPE = rule.issues 1086 overall_summary["Total Issues"] += len(issues) 1087 1088 if len(issues) == 0: 1089 overall_summary["Rules without Issues"] += 1 1090 else: 1091 overall_summary["Rules with Issues"] += 1 1092 1093 return overall_summary 1094 1095 1096def __get_issues_by_group( 1097 output: OutputReport, 1098 checkers: Optional[Sequence[CheckerInterface]] = None, 1099) -> SIMPLE_SUMMARY_TYPE: 1100 if checkers is None: 1101 checkers = get_checkers() 1102 1103 issues_by_group = defaultdict(int) 1104 1105 # Ensure also checkers and codes without issues are included in the report. 1106 for checker in checkers: 1107 issues_by_group[checker.__class__.__name__] = 0 1108 1109 rules: RULE_REPORTS_TYPE = output.rules 1110 for rule in rules: 1111 issues: ISSUES_TYPE = rule.issues 1112 1113 for issue in issues: 1114 checker = issue.checker 1115 if checker is not None: 1116 issues_by_group[checker] += 1 1117 1118 return __sort_mapping(issues_by_group) 1119 1120 1121def __get_issues_by_type( 1122 output: OutputReport, 1123 checkers: Optional[Sequence[CheckerInterface]] = None, 1124) -> EXTENSIVE_SUMMARY_TYPE: 1125 if checkers is None: 1126 checkers = get_checkers() 1127 issues_by_type: EXTENSIVE_SUMMARY_TYPE = defaultdict(lambda: defaultdict(int)) 1128 1129 # Ensure also checkers and codes without issues are included in the report. 1130 for checker in checkers: 1131 for code in checker.codes: 1132 issues_by_type[checker.__class__.__name__][code] = 0 1133 1134 rules: RULE_REPORTS_TYPE = output.rules 1135 for rule in rules: 1136 issues: ISSUES_TYPE = rule.issues 1137 1138 checker_codes = defaultdict(lambda: defaultdict(int)) 1139 for issue in issues: 1140 checker = issue.checker 1141 if checker is not None: 1142 code = issue.code 1143 checker_codes[checker][code] += 1 1144 1145 for checker, codes in checker_codes.items(): 1146 for code, count in codes.items(): 1147 issues_by_type[checker][code] += count 1148 1149 for key in issues_by_type: 1150 issues_by_type[key] = __sort_mapping(issues_by_type[key]) 1151 1152 return __sort_mapping(issues_by_type) 1153 1154 1155def __sort_mapping(mapping: Mapping) -> dict: 1156 return {key: mapping[key] for key in sorted(mapping.keys())} 1157 1158 1159if __name__ == "__main__": 1160 main()