Source code for suricata_check.suricata_check

   1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
   2
   3import atexit
   4import configparser
   5import io
   6import json
   7import logging
   8import logging.handlers
   9import multiprocessing
  10import os
  11import pkgutil
  12import sys
  13from collections import defaultdict
  14from collections.abc import Mapping, Sequence
  15from functools import lru_cache
  16from typing import (
  17    Any,
  18    Literal,
  19    Optional,
  20    TypeVar,
  21    Union,
  22    overload,
  23)
  24
  25import click
  26import idstools.rule
  27import tabulate
  28
  29# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
  30_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
  31if sys.path[0] != _suricata_check_path:
  32    sys.path.insert(0, _suricata_check_path)
  33
  34from suricata_check import __version__, get_dependency_versions  # noqa: E402
  35from suricata_check.checkers.interface import CheckerInterface  # noqa: E402
  36from suricata_check.checkers.interface.dummy import DummyChecker  # noqa: E402
  37from suricata_check.utils._click import ClickHandler, ClickHelpOption  # noqa: E402
  38from suricata_check.utils._path import find_rules_file  # noqa: E402
  39from suricata_check.utils.checker import (  # noqa: E402
  40    check_rule_option_recognition,
  41    get_rule_suboption,
  42)
  43from suricata_check.utils.checker_typing import (  # noqa: E402
  44    EXTENSIVE_SUMMARY_TYPE,
  45    ISSUES_TYPE,
  46    RULE_REPORTS_TYPE,
  47    RULE_SUMMARY_TYPE,
  48    SIMPLE_SUMMARY_TYPE,
  49    InvalidRuleError,
  50    OutputReport,
  51    OutputSummary,
  52    RuleReport,
  53    get_all_subclasses,
  54)
  55from suricata_check.utils.regex import get_regex_provider, is_valid_rule  # noqa: E402
  56
  57LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
  58LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
  59GITLAB_SEVERITIES = {
  60    logging.DEBUG: "info",
  61    logging.INFO: "info",
  62    logging.WARNING: "minor",
  63    logging.ERROR: "major",
  64    logging.CRITICAL: "critical",
  65}
  66GITHUB_SEVERITIES = {
  67    logging.DEBUG: "debug",
  68    logging.INFO: "notice",
  69    logging.WARNING: "warning",
  70    logging.ERROR: "error",
  71    logging.CRITICAL: "error",
  72}
  73GITHUB_COMMAND = (
  74    "::{level} file={file},line={line},endLine={end_line},title={title}::{message}"
  75)
  76
  77_logger = logging.getLogger(__name__)
  78
  79_regex_provider = get_regex_provider()
  80
  81# Global variable to check if extensions have already been imported in case get_checkers() is called multiple times.
  82suricata_check_extensions_imported = False
  83
  84
  85@click.command()
  86@click.option(
  87    "--ini",
  88    "-i",
  89    help="Path to suricata-check.ini file to read configuration from.",
  90    show_default=True,
  91)
  92@click.option(
  93    "--rules",
  94    "-r",
  95    help="Path to Suricata rules to provide check on.",
  96    show_default=True,
  97)
  98@click.option(
  99    "--single-rule",
 100    "-s",
 101    help="A single Suricata rule to be checked",
 102    show_default=False,
 103)
 104@click.option(
 105    "--out",
 106    "-o",
 107    help="Path to suricata-check output folder.",
 108    show_default=True,
 109)
 110@click.option(
 111    "--log-level",
 112    help=f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
 113    show_default=True,
 114)
 115@click.option(
 116    "--gitlab",
 117    help="Flag to create CodeClimate output report for GitLab CI/CD.",
 118    show_default=True,
 119    is_flag=True,
 120)
 121@click.option(
 122    "--github",
 123    help="Flag to write workflow commands to stdout for GitHub CI/CD.",
 124    show_default=True,
 125    is_flag=True,
 126)
 127@click.option(
 128    "--evaluate-disabled",
 129    help="Flag to evaluate disabled rules.",
 130    show_default=True,
 131    is_flag=True,
 132)
 133@click.option(
 134    "--issue-severity",
 135    help=f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
 136    show_default=True,
 137)
 138@click.option(
 139    "--include-all",
 140    "-a",
 141    help="Flag to indicate all checker codes should be enabled.",
 142    show_default=True,
 143    is_flag=True,
 144)
 145@click.option(
 146    "--include",
 147    "-i",
 148    help="List of all checker codes to enable.",
 149    show_default=True,
 150    multiple=True,
 151)
 152@click.option(
 153    "--exclude",
 154    "-e",
 155    help="List of all checker codes to disable.",
 156    show_default=True,
 157    multiple=True,
 158)
 159@click.help_option("-h", "--help", cls=ClickHelpOption)
 160def main(  # noqa: PLR0915
 161    **kwargs: dict[str, Any],
 162) -> None:
 163    """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
 164
 165    Raises:
 166      BadParameter: If provided arguments are invalid.
 167
 168      RuntimeError: If no checkers could be automatically discovered.
 169
 170    """
 171    # Look for a ini file and parse it.
 172    ini_kwargs = __get_ini_kwargs(
 173        str(kwargs["ini"]) if kwargs["ini"] is not None else None  # type: ignore reportUnnecessaryComparison
 174    )
 175
 176    # Verify CLI argument types and get CLI arguments or use default arguments
 177    rules: str = __get_verified_kwarg([kwargs, ini_kwargs], "rules", str, False, ".")
 178    single_rule: Optional[str] = __get_verified_kwarg(
 179        [kwargs, ini_kwargs], "single_rule", str, True, None
 180    )
 181    out: str = __get_verified_kwarg([kwargs, ini_kwargs], "out", str, False, ".")
 182    log_level: LogLevel = __get_verified_kwarg(
 183        [kwargs, ini_kwargs], "log_level", str, False, "DEBUG"
 184    )
 185    gitlab: bool = __get_verified_kwarg(
 186        [kwargs, ini_kwargs], "gitlab", bool, False, False
 187    )
 188    github: bool = __get_verified_kwarg(
 189        [kwargs, ini_kwargs], "github", bool, False, False
 190    )
 191    evaluate_disabled: bool = __get_verified_kwarg(
 192        [kwargs, ini_kwargs], "evaluate_disabled", bool, False, False
 193    )
 194    issue_severity: LogLevel = __get_verified_kwarg(
 195        [kwargs, ini_kwargs], "issue_severity", str, False, "INFO"
 196    )
 197    include_all: bool = __get_verified_kwarg(
 198        [kwargs, ini_kwargs], "include_all", bool, False, False
 199    )
 200    include: tuple[str, ...] = __get_verified_kwarg(
 201        [kwargs, ini_kwargs], "include", tuple, False, ()
 202    )
 203    exclude: tuple[str, ...] = __get_verified_kwarg(
 204        [kwargs, ini_kwargs], "exclude", tuple, False, ()
 205    )
 206
 207    # Verify that out argument is valid
 208    if os.path.exists(out) and not os.path.isdir(out):
 209        raise click.BadParameter(f"Error: {out} is not a directory.")
 210
 211    # Verify that log_level argument is valid
 212    if log_level not in LOG_LEVELS:
 213        raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
 214
 215    # Create out directory if non-existent
 216    if not os.path.exists(out):
 217        os.makedirs(out)
 218
 219    # Setup logging from a seperate thread
 220    queue = multiprocessing.Manager().Queue()
 221    queue_handler = logging.handlers.QueueHandler(queue)
 222
 223    click_handler = ClickHandler(
 224        github=github, github_level=getattr(logging, log_level)
 225    )
 226    logging.basicConfig(
 227        level=log_level,
 228        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 229        handlers=(queue_handler, click_handler),
 230        force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", False) == "TRUE",
 231    )
 232
 233    file_handler = logging.FileHandler(
 234        filename=os.path.join(out, "suricata-check.log"),
 235        delay=True,
 236    )
 237    queue_listener = logging.handlers.QueueListener(
 238        queue,
 239        file_handler,
 240        respect_handler_level=True,
 241    )
 242
 243    def _at_exit() -> None:
 244        """Cleans up logging listener and handlers before exiting."""
 245        queue_listener.enqueue_sentinel()
 246        queue_listener.stop()
 247        file_handler.flush()
 248        file_handler.close()
 249        atexit.unregister(_at_exit)
 250
 251    atexit.register(_at_exit)
 252
 253    queue_listener.start()
 254
 255    # Log the arguments:
 256    _logger.info("Running suricata-check with the following arguments:")
 257    _logger.info("out: %s", out)
 258    _logger.info("rules: %s", rules)
 259    _logger.info("single_rule: %s", single_rule)
 260    _logger.info("log_level: %s", log_level)
 261    _logger.info("gitlab: %s", gitlab)
 262    _logger.info("github: %s", github)
 263    _logger.info("evaluate_disabled: %s", evaluate_disabled)
 264    _logger.info("issue_severity: %s", issue_severity)
 265    _logger.info("include_all: %s", include_all)
 266    _logger.info("include: %s", include)
 267    _logger.info("exclude: %s", exclude)
 268
 269    # Log the environment:
 270    _logger.debug("Platform: %s", sys.platform)
 271    _logger.debug("Python version: %s", sys.version)
 272    _logger.debug("suricata-check path: %s", _suricata_check_path)
 273    _logger.debug("suricata-check version: %s", __version__)
 274    for package, version in get_dependency_versions().items():
 275        _logger.debug("Dependency %s version: %s", package, version)
 276
 277    # Verify that include and exclude arguments are valid
 278    if include_all and len(include) > 0:
 279        raise click.BadParameter(
 280            "Error: Cannot use --include-all and --include together."
 281        )
 282    if include_all:
 283        include = (".*",)
 284
 285    # Verify that issue_severity argument is valid
 286    if issue_severity not in LOG_LEVELS:
 287        raise click.BadParameter(
 288            f"Error: {issue_severity} is not a valid issue severity or log level."
 289        )
 290
 291    checkers = get_checkers(
 292        include, exclude, issue_severity=getattr(logging, issue_severity)
 293    )
 294
 295    if single_rule is not None:
 296        __main_single_rule(out, single_rule, checkers)
 297
 298        # Return here so no rules file is processed.
 299        _at_exit()
 300        return
 301
 302    # Check if the rules argument is valid and find the rules file
 303    rules = find_rules_file(rules)
 304
 305    output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
 306
 307    __write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
 308
 309    _at_exit()
 310
 311
 312def __get_ini_kwargs(path: Optional[str]) -> dict[str, Any]:  # noqa: C901, PLR0912
 313    ini_kwargs: dict[str, Any] = {}
 314    if path is not None:
 315        if not os.path.exists(path):
 316            raise click.BadParameter(
 317                f"Error: INI file provided in {path} but no options loaded"
 318            )
 319
 320    # Use the default path if no path was provided
 321    if path is None:
 322        path = "suricata-check.ini"
 323        if not os.path.exists(path):
 324            return {}
 325
 326    config_parser = configparser.ConfigParser(
 327        empty_lines_in_values=False,
 328        default_section="suricata-check",
 329        converters={"tuple": lambda x: tuple(json.loads(x))},
 330    )
 331    config_parser.read(path)
 332    ini_kwargs = {}
 333
 334    if config_parser.has_option("suricata-check", "rules"):
 335        ini_kwargs["rules"] = config_parser.get("suricata-check", "rules")
 336    if config_parser.has_option("suricata-check", "out"):
 337        ini_kwargs["out"] = config_parser.get("suricata-check", "out")
 338    if config_parser.has_option("suricata-check", "log"):
 339        ini_kwargs["log"] = config_parser.get("suricata-check", "log")
 340    if config_parser.has_option("suricata-check", "gitlab"):
 341        ini_kwargs["gitlab"] = config_parser.getboolean("suricata-check", "gitlab")
 342    if config_parser.has_option("suricata-check", "github"):
 343        ini_kwargs["github"] = config_parser.getboolean("suricata-check", "github")
 344    if config_parser.has_option("suricata-check", "evaluate_disabled"):
 345        ini_kwargs["evaluate_disabled"] = config_parser.getboolean(
 346            "suricata-check", "evaluate_disabled"
 347        )
 348    if config_parser.has_option("suricata-check", "issue-severity"):
 349        ini_kwargs["issue_severity"] = config_parser.get(
 350            "suricata-check", "issue-severity"
 351        )
 352    if config_parser.has_option("suricata-check", "include-all"):
 353        ini_kwargs["include_all"] = config_parser.getboolean(
 354            "suricata-check", "include-all"
 355        )
 356    if config_parser.has_option("suricata-check", "include"):
 357        ini_kwargs["include"] = config_parser.gettuple("suricata-check", "include")  # type: ignore reportAttributeAccessIssue
 358    if config_parser.has_option("suricata-check", "exclude"):
 359        ini_kwargs["exclude"] = config_parser.gettuple("suricata-check", "exclude")  # type: ignore reportAttributeAccessIssue
 360
 361    return ini_kwargs
 362
 363
 364D = TypeVar("D")
 365
 366
 367@overload
 368def __get_verified_kwarg(
 369    kwargss: Sequence[dict[str, Any]],
 370    name: str,
 371    expected_type: type,
 372    optional: Literal[True],
 373    default: D,
 374) -> Optional[D]:
 375    pass
 376
 377
 378@overload
 379def __get_verified_kwarg(
 380    kwargss: Sequence[dict[str, Any]],
 381    name: str,
 382    expected_type: type,
 383    optional: Literal[False],
 384    default: D,
 385) -> D:
 386    pass
 387
 388
 389def __get_verified_kwarg(
 390    kwargss: Sequence[dict[str, Any]],
 391    name: str,
 392    expected_type: type,
 393    optional: bool,
 394    default: D,
 395) -> Optional[D]:
 396    for kwargs in kwargss:
 397        if name in kwargs:
 398            if kwargs[name] is None:
 399                if optional and default is not None:
 400                    return None
 401                return default
 402
 403            if kwargs[name] is not default:
 404                if not isinstance(kwargs[name], expected_type):
 405                    raise click.BadParameter(
 406                        f"""Error: \
 407                Argument `{name}` should have a value of type `{expected_type}` \
 408                but has value {kwargs[name]} of type {kwargs[name].__class__} instead."""
 409                    )
 410                return kwargs[name]
 411
 412    return default
 413
 414
 415def __main_single_rule(
 416    out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]]
 417) -> None:
 418    rule: Optional[idstools.rule.Rule] = idstools.rule.parse(single_rule)
 419
 420    # Verify that a rule was parsed correctly.
 421    if rule is None:
 422        msg = f"Error parsing rule from user input: {single_rule}"
 423        _logger.critical(msg)
 424        raise click.BadParameter(f"Error: {msg}")
 425
 426    if not is_valid_rule(rule):
 427        msg = f"Error parsing rule from user input: {single_rule}"
 428        _logger.critical(msg)
 429        raise click.BadParameter(f"Error: {msg}")
 430
 431    _logger.debug("Processing rule: %s", rule["sid"])
 432
 433    rule_report = analyze_rule(rule, checkers=checkers)
 434
 435    __write_output(OutputReport(rules=[rule_report]), out)
 436
 437
 438def __write_output(
 439    output: OutputReport,
 440    out: str,
 441    gitlab: bool = False,
 442    github: bool = False,
 443    rules_file: Optional[str] = None,
 444) -> None:
 445    _logger.info(
 446        "Writing output to suricata-check.jsonl and suricata-check-fast.log in %s",
 447        os.path.abspath(out),
 448    )
 449    with (
 450        open(
 451            os.path.join(out, "suricata-check.jsonl"),
 452            "w",
 453            buffering=io.DEFAULT_BUFFER_SIZE,
 454        ) as jsonl_fh,
 455        open(
 456            os.path.join(out, "suricata-check-fast.log"),
 457            "w",
 458            buffering=io.DEFAULT_BUFFER_SIZE,
 459        ) as fast_fh,
 460    ):
 461        rules: RULE_REPORTS_TYPE = output.rules
 462        jsonl_fh.write("\n".join([str(rule) for rule in rules]))
 463
 464        for rule_report in rules:
 465            rule: idstools.rule.Rule = rule_report.rule
 466            lines: str = (
 467                "{}-{}".format(rule_report.line_begin, rule_report.line_end)
 468                if rule_report.line_begin
 469                else "Unknown"
 470            )
 471            issues: ISSUES_TYPE = rule_report.issues
 472            for issue in issues:
 473                code = issue.code
 474                severity = (
 475                    logging.getLevelName(issue.severity) if issue.severity else None
 476                )
 477                issue_msg = issue.message.replace("\n", " ")
 478
 479                msg = "[{}]{} Lines {}, sid {}: {}".format(
 480                    code,
 481                    f" ({severity})" if severity else "",
 482                    lines,
 483                    rule["sid"],
 484                    issue_msg,
 485                )
 486                fast_fh.write(msg + "\n")
 487                click.secho(msg, color=True, fg="blue")
 488
 489    if output.summary is not None:
 490        __write_output_stats(output, out)
 491
 492    if gitlab:
 493        assert rules_file is not None
 494
 495        __write_output_gitlab(output, out, rules_file)
 496
 497    if github:
 498        assert rules_file is not None
 499
 500        __write_output_github(output, rules_file)
 501
 502
 503def __write_output_stats(output: OutputReport, out: str) -> None:
 504    assert output.summary is not None
 505
 506    with open(
 507        os.path.join(out, "suricata-check-stats.log"),
 508        "w",
 509        buffering=io.DEFAULT_BUFFER_SIZE,
 510    ) as stats_fh:
 511        summary: OutputSummary = output.summary
 512
 513        overall_summary: SIMPLE_SUMMARY_TYPE = summary.overall_summary
 514
 515        n_issues = overall_summary["Total Issues"]
 516        n_rules = (
 517            overall_summary["Rules with Issues"]
 518            + overall_summary["Rules without Issues"]
 519        )
 520
 521        stats_fh.write(
 522            tabulate.tabulate(
 523                (
 524                    (
 525                        k,
 526                        v,
 527                        (
 528                            "{:.0%}".format(v / n_rules)
 529                            if k.startswith("Rules ") and n_rules > 0
 530                            else "-"
 531                        ),
 532                    )
 533                    for k, v in overall_summary.items()
 534                ),
 535                headers=(
 536                    "Count",
 537                    "Percentage of Rules",
 538                ),
 539            )
 540            + "\n\n",
 541        )
 542
 543        click.secho(
 544            f"Total issues found: {overall_summary['Total Issues']}",
 545            color=True,
 546            bold=True,
 547            fg="blue",
 548        )
 549        click.secho(
 550            f"Rules with Issues found: {overall_summary['Rules with Issues']}",
 551            color=True,
 552            bold=True,
 553            fg="blue",
 554        )
 555
 556        issues_by_group: SIMPLE_SUMMARY_TYPE = summary.issues_by_group
 557
 558        stats_fh.write(
 559            tabulate.tabulate(
 560                (
 561                    (k, v, "{:.0%}".format(v / n_issues) if n_issues > 0 else "-")
 562                    for k, v in issues_by_group.items()
 563                ),
 564                headers=(
 565                    "Count",
 566                    "Percentage of Total Issues",
 567                ),
 568            )
 569            + "\n\n",
 570        )
 571
 572        issues_by_type: EXTENSIVE_SUMMARY_TYPE = summary.issues_by_type
 573        for checker, checker_issues_by_type in issues_by_type.items():
 574            stats_fh.write(" " + checker + " " + "\n")
 575            stats_fh.write("-" * (len(checker) + 2) + "\n")
 576            stats_fh.write(
 577                tabulate.tabulate(
 578                    (
 579                        (
 580                            k,
 581                            v,
 582                            "{:.0%}".format(v / n_rules) if n_rules > 0 else "-",
 583                        )
 584                        for k, v in checker_issues_by_type.items()
 585                    ),
 586                    headers=(
 587                        "Count",
 588                        "Percentage of Rules",
 589                    ),
 590                )
 591                + "\n\n",
 592            )
 593
 594
 595def __write_output_gitlab(output: OutputReport, out: str, rules_file: str) -> None:
 596    with open(
 597        os.path.join(out, "suricata-check-gitlab.json"),
 598        "w",
 599        buffering=io.DEFAULT_BUFFER_SIZE,
 600    ) as gitlab_fh:
 601        issue_dicts = []
 602        for rule_report in output.rules:
 603            line_begin: Optional[int] = rule_report.line_begin
 604            assert line_begin is not None
 605            line_end: Optional[int] = rule_report.line_end
 606            assert line_end is not None
 607            issues: ISSUES_TYPE = rule_report.issues
 608            for issue in issues:
 609                code = issue.code
 610                issue_msg = issue.message.replace("\n", " ")
 611                assert issue.checker is not None
 612                issue_checker = issue.checker
 613                issue_hash = str(issue.hash)
 614                assert issue.severity is not None
 615                issue_severity = GITLAB_SEVERITIES[issue.severity]
 616
 617                issue_dict: Mapping[
 618                    str,
 619                    Union[str, list[str], Mapping[str, Union[str, Mapping[str, int]]]],
 620                ] = {
 621                    "description": issue_msg,
 622                    "categories": [issue_checker],
 623                    "check_name": f"Suricata Check {code}",
 624                    "fingerprint": issue_hash,
 625                    "severity": issue_severity,
 626                    "location": {
 627                        "path": rules_file,
 628                        "lines": {"begin": line_begin, "end": line_end},
 629                    },
 630                }
 631                issue_dicts.append(issue_dict)
 632
 633        gitlab_fh.write(json.dumps(issue_dicts))
 634
 635
 636def __write_output_github(output: OutputReport, rules_file: str) -> None:
 637    output_lines: dict[str, list[str]] = {
 638        k: [] for k in set(GITHUB_SEVERITIES.values())
 639    }
 640    for rule_report in output.rules:
 641        line_begin: Optional[int] = rule_report.line_begin
 642        assert line_begin is not None
 643        line_end: Optional[int] = rule_report.line_end
 644        assert line_end is not None
 645        issues: ISSUES_TYPE = rule_report.issues
 646        for issue in issues:
 647            code = issue.code
 648            issue_msg = issue.message.replace("\n", " ")
 649            assert issue.checker is not None
 650            issue_checker = issue.checker
 651            assert issue.severity is not None
 652            issue_severity = GITHUB_SEVERITIES[issue.severity]
 653            title = f"{issue_checker} - {code}"
 654
 655            output_lines[issue_severity].append(
 656                GITHUB_COMMAND.format(
 657                    level=issue_severity,
 658                    file=rules_file,
 659                    line=line_begin,
 660                    end_line=line_end,
 661                    title=title,
 662                    message=issue_msg,
 663                )
 664            )
 665
 666    for message_level, lines in output_lines.items():
 667        if len(lines) > 0:
 668            print(f"::group::{message_level}")  # noqa: T201
 669            for message in lines:
 670                print(message)  # noqa: T201
 671            print("::endgroup::")  # noqa: T201
 672
 673
[docs] 674def process_rules_file( # noqa: C901, PLR0912, PLR0915 675 rules: str, 676 evaluate_disabled: bool, 677 checkers: Optional[Sequence[CheckerInterface]] = None, 678) -> OutputReport: 679 """Processes a rule file and returns a list of rules and their issues. 680 681 Args: 682 rules: A path to a Suricata rules file. 683 evaluate_disabled: A flag indicating whether disabled rules should be evaluated. 684 checkers: The checkers to be used when processing the rule file. 685 686 Returns: 687 A list of rules and their issues. 688 689 Raises: 690 RuntimeError: If no checkers could be automatically discovered. 691 692 """ 693 if checkers is None: 694 checkers = get_checkers() 695 696 output = OutputReport() 697 698 with ( 699 open( 700 os.path.normpath(rules), 701 buffering=io.DEFAULT_BUFFER_SIZE, 702 ) as rules_fh, 703 ): 704 if len(checkers) == 0: 705 msg = "No checkers provided for processing rules." 706 _logger.error(msg) 707 raise RuntimeError(msg) 708 709 _logger.info("Processing rule file: %s", rules) 710 711 collected_multiline_parts: Optional[str] = None 712 multiline_begin_number: Optional[int] = None 713 714 for number, line in enumerate(rules_fh.readlines(), start=1): 715 # First work on collecting and parsing multiline rules 716 if line.rstrip("\r\n").endswith("\\"): 717 multiline_part = line.rstrip("\r\n")[:-1] 718 719 if collected_multiline_parts is None: 720 collected_multiline_parts = multiline_part 721 multiline_begin_number = number 722 else: 723 collected_multiline_parts += multiline_part.lstrip() 724 725 continue 726 727 # Process final part of multiline rule if one is being collected 728 if collected_multiline_parts is not None: 729 collected_multiline_parts += line.lstrip() 730 731 rule_line = collected_multiline_parts.strip() 732 733 collected_multiline_parts = None 734 # If no multiline rule is being collected process as a potential single line rule 735 else: 736 if len(line.strip()) == 0: 737 continue 738 739 if line.strip().startswith("#"): 740 if evaluate_disabled: 741 # Verify that this line is a rule and not a comment 742 if idstools.rule.parse(line) is None: 743 # Log the comment since it may be a invalid rule 744 _logger.warning( 745 "Ignoring comment on line %i: %s", number, line 746 ) 747 continue 748 else: 749 # Skip the rule 750 continue 751 752 rule_line = line.strip() 753 754 try: 755 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(rule_line) 756 except Exception: # noqa: BLE001 757 _logger.error( 758 "Internal error in idstools parsing rule on line %i: %s", 759 number, 760 rule_line, 761 ) 762 rule = None 763 764 # Parse comment and potential ignore comment to ignore rules 765 ignore = __parse_type_ignore(rule) 766 767 # Verify that a rule was parsed correctly. 768 if rule is None: 769 _logger.error("Error parsing rule on line %i: %s", number, rule_line) 770 continue 771 772 if not is_valid_rule(rule): 773 _logger.error("Invalid rule on line %i: %s", number, rule_line) 774 continue 775 776 _logger.debug("Processing rule: %s on line %i", rule["sid"], number) 777 778 rule_report: RuleReport = analyze_rule( 779 rule, 780 checkers=checkers, 781 ignore=ignore, 782 ) 783 rule_report.line_begin = multiline_begin_number or number 784 rule_report.line_end = number 785 786 output.rules.append(rule_report) 787 788 multiline_begin_number = None 789 790 _logger.info("Completed processing rule file: %s", rules) 791 792 output.summary = __summarize_output(output, checkers) 793 794 return output
795 796 797def __is_valid_idstools_rule(text: str) -> bool: 798 try: 799 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(text) 800 except Exception: # noqa: BLE001 801 return False 802 803 if rule is None: 804 return False 805 806 return True 807 808 809def __parse_type_ignore(rule: Optional[idstools.rule.Rule]) -> Optional[Sequence[str]]: 810 if rule is None: 811 return None 812 813 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check") 814 if ignore_value is None: 815 return [] 816 817 return ignore_value.strip(' "').split(",") 818 819 820def _import_extensions() -> None: 821 global suricata_check_extensions_imported # noqa: PLW0603 822 if suricata_check_extensions_imported is True: 823 return 824 825 for module in pkgutil.iter_modules(): 826 if module.name.startswith("suricata_check_"): 827 try: 828 imported_module = __import__(module.name) 829 _logger.info( 830 "Detected and successfully imported suricata-check extension %s with version %s.", 831 module.name.replace("_", "-"), 832 getattr(imported_module, "__version__"), 833 ) 834 except ImportError: 835 _logger.warning( 836 "Detected potential suricata-check extension %s but failed to import it.", 837 module.name.replace("_", "-"), 838 ) 839 suricata_check_extensions_imported = True 840 841
[docs] 842@lru_cache(maxsize=1) 843def get_checkers( 844 include: Sequence[str] = (".*",), 845 exclude: Sequence[str] = (), 846 issue_severity: int = logging.INFO, 847) -> Sequence[CheckerInterface]: 848 """Auto discovers all available checkers that implement the CheckerInterface. 849 850 Returns: 851 A list of available checkers that implement the CheckerInterface. 852 853 """ 854 # Check for extensions and try to import them 855 _import_extensions() 856 857 checkers: list[CheckerInterface] = [] 858 for checker in get_all_subclasses(CheckerInterface): 859 if checker.__name__ == DummyChecker.__name__: 860 continue 861 862 # Initialize DummyCheckers to retrieve error messages. 863 if issubclass(checker, DummyChecker): 864 checker() 865 866 enabled, relevant_codes = __get_checker_enabled( 867 checker, include, exclude, issue_severity 868 ) 869 870 if enabled: 871 checkers.append(checker(include=relevant_codes)) 872 873 else: 874 _logger.info("Checker %s is disabled.", checker.__name__) 875 876 _logger.info( 877 "Discovered and enabled checkers: [%s]", 878 ", ".join([c.__class__.__name__ for c in checkers]), 879 ) 880 if len(checkers) == 0: 881 _logger.warning( 882 "No checkers were enabled. Check the include and exclude arguments." 883 ) 884 885 # Perform a uniqueness check on the codes emmitted by the checkers 886 for checker1 in checkers: 887 for checker2 in checkers: 888 if checker1 == checker2: 889 continue 890 if not set(checker1.codes).isdisjoint(checker2.codes): 891 msg = f"Checker {checker1.__class__.__name__} and {checker2.__class__.__name__} have overlapping codes." 892 _logger.error(msg) 893 894 return sorted(checkers, key=lambda x: x.__class__.__name__)
895 896 897def __get_checker_enabled( 898 checker: type[CheckerInterface], 899 include: Sequence[str], 900 exclude: Sequence[str], 901 issue_severity: int, 902) -> tuple[bool, set[str]]: 903 enabled = checker.enabled_by_default 904 905 # If no include regexes are provided, include all by default 906 if len(include) == 0: 907 relevant_codes = set(checker.codes.keys()) 908 else: 909 # If include regexes are provided, include all codes that match any of these regexes 910 relevant_codes = set() 911 912 for regex in include: 913 relevant_codes.update( 914 set( 915 filter( 916 lambda code: _regex_provider.compile("^" + regex + "$").match( 917 code 918 ) 919 is not None, 920 checker.codes.keys(), 921 ) 922 ) 923 ) 924 925 if len(relevant_codes) > 0: 926 enabled = True 927 928 # Now remove the codes that are excluded according to any of the provided exclude regexes 929 for regex in exclude: 930 relevant_codes = set( 931 filter( 932 lambda code: _regex_provider.compile("^" + regex + "$").match(code) 933 is None, 934 relevant_codes, 935 ) 936 ) 937 938 # Now filter out irrelevant codes based on severity 939 relevant_codes = set( 940 filter( 941 lambda code: checker.codes[code]["severity"] >= issue_severity, 942 relevant_codes, 943 ) 944 ) 945 946 if len(relevant_codes) == 0: 947 enabled = False 948 949 return enabled, relevant_codes 950 951
[docs] 952def analyze_rule( 953 rule: idstools.rule.Rule, 954 checkers: Optional[Sequence[CheckerInterface]] = None, 955 ignore: Optional[Sequence[str]] = None, 956) -> RuleReport: 957 """Checks a rule and returns a dictionary containing the rule and a list of issues found. 958 959 Args: 960 rule: The rule to be checked. 961 checkers: The checkers to be used to check the rule. 962 ignore: Regular expressions to match checker codes to ignore 963 964 Returns: 965 A list of issues found in the rule. 966 Each issue is typed as a `dict`. 967 968 Raises: 969 InvalidRuleError: If the rule does not follow the Suricata syntax. 970 971 """ 972 if not is_valid_rule(rule): 973 raise InvalidRuleError(rule["raw"]) 974 975 check_rule_option_recognition(rule) 976 977 if checkers is None: 978 checkers = get_checkers() 979 980 rule_report: RuleReport = RuleReport(rule=rule) 981 982 _logger.warning(ignore) 983 984 compiled_ignore = ( 985 [_regex_provider.compile(r) for r in ignore] if ignore is not None else [] 986 ) 987 988 for checker in checkers: 989 try: 990 issues = checker.check_rule(rule) 991 for r in compiled_ignore: 992 issues = list(filter(lambda issue: r.match(issue.code) is None, issues)) 993 rule_report.add_issues(issues) 994 except Exception as exception: # noqa: BLE001 995 _logger.warning( 996 "Failed to run %s on rule: %s", 997 checker.__class__.__name__, 998 rule["raw"], 999 extra={"exception": exception}, 1000 ) 1001 1002 rule_report.summary = __summarize_rule(rule_report, checkers) 1003 1004 return rule_report
1005 1006 1007def __summarize_rule( 1008 rule: RuleReport, 1009 checkers: Optional[Sequence[CheckerInterface]] = None, 1010) -> RULE_SUMMARY_TYPE: 1011 """Summarizes the issues found in a rule. 1012 1013 Args: 1014 rule: The rule output dictionary to be summarized. 1015 checkers: The checkers to be used to check the rule. 1016 1017 Returns: 1018 A dictionary containing a summary of all issues found in the rule. 1019 1020 """ 1021 if checkers is None: 1022 checkers = get_checkers() 1023 1024 summary = {} 1025 1026 issues: ISSUES_TYPE = rule.issues 1027 summary["total_issues"] = len(issues) 1028 summary["issues_by_group"] = defaultdict(int) 1029 for issue in issues: 1030 checker = issue.checker 1031 summary["issues_by_group"][checker] += 1 1032 1033 # Ensure also checkers without issues are included in the report. 1034 for checker in checkers: 1035 if checker.__class__.__name__ not in summary["issues_by_group"]: 1036 summary["issues_by_group"][checker.__class__.__name__] = 0 1037 1038 # Sort dictionaries for deterministic output 1039 summary["issues_by_group"] = __sort_mapping(summary["issues_by_group"]) 1040 1041 return summary 1042 1043 1044def __summarize_output( 1045 output: OutputReport, 1046 checkers: Optional[Sequence[CheckerInterface]] = None, 1047) -> OutputSummary: 1048 """Summarizes the issues found in a rules file. 1049 1050 Args: 1051 output: The unsammarized output of the rules file containing all rules and their issues. 1052 checkers: The checkers to be used to check the rule. 1053 1054 Returns: 1055 A dictionary containing a summary of all issues found in the rules file. 1056 1057 """ 1058 if checkers is None: 1059 checkers = get_checkers() 1060 1061 return OutputSummary( 1062 overall_summary=__get_overall_summary(output), 1063 issues_by_group=__get_issues_by_group(output, checkers), 1064 issues_by_type=__get_issues_by_type(output, checkers), 1065 ) 1066 1067 1068def __get_overall_summary( 1069 output: OutputReport, 1070) -> SIMPLE_SUMMARY_TYPE: 1071 overall_summary = { 1072 "Total Issues": 0, 1073 "Rules with Issues": 0, 1074 "Rules without Issues": 0, 1075 } 1076 1077 rules: RULE_REPORTS_TYPE = output.rules 1078 for rule in rules: 1079 issues: ISSUES_TYPE = rule.issues 1080 overall_summary["Total Issues"] += len(issues) 1081 1082 if len(issues) == 0: 1083 overall_summary["Rules without Issues"] += 1 1084 else: 1085 overall_summary["Rules with Issues"] += 1 1086 1087 return overall_summary 1088 1089 1090def __get_issues_by_group( 1091 output: OutputReport, 1092 checkers: Optional[Sequence[CheckerInterface]] = None, 1093) -> SIMPLE_SUMMARY_TYPE: 1094 if checkers is None: 1095 checkers = get_checkers() 1096 1097 issues_by_group = defaultdict(int) 1098 1099 # Ensure also checkers and codes without issues are included in the report. 1100 for checker in checkers: 1101 issues_by_group[checker.__class__.__name__] = 0 1102 1103 rules: RULE_REPORTS_TYPE = output.rules 1104 for rule in rules: 1105 issues: ISSUES_TYPE = rule.issues 1106 1107 for issue in issues: 1108 checker = issue.checker 1109 if checker is not None: 1110 issues_by_group[checker] += 1 1111 1112 return __sort_mapping(issues_by_group) 1113 1114 1115def __get_issues_by_type( 1116 output: OutputReport, 1117 checkers: Optional[Sequence[CheckerInterface]] = None, 1118) -> EXTENSIVE_SUMMARY_TYPE: 1119 if checkers is None: 1120 checkers = get_checkers() 1121 issues_by_type: EXTENSIVE_SUMMARY_TYPE = defaultdict(lambda: defaultdict(int)) 1122 1123 # Ensure also checkers and codes without issues are included in the report. 1124 for checker in checkers: 1125 for code in checker.codes: 1126 issues_by_type[checker.__class__.__name__][code] = 0 1127 1128 rules: RULE_REPORTS_TYPE = output.rules 1129 for rule in rules: 1130 issues: ISSUES_TYPE = rule.issues 1131 1132 checker_codes = defaultdict(lambda: defaultdict(int)) 1133 for issue in issues: 1134 checker = issue.checker 1135 if checker is not None: 1136 code = issue.code 1137 checker_codes[checker][code] += 1 1138 1139 for checker, codes in checker_codes.items(): 1140 for code, count in codes.items(): 1141 issues_by_type[checker][code] += count 1142 1143 for key in issues_by_type: 1144 issues_by_type[key] = __sort_mapping(issues_by_type[key]) 1145 1146 return __sort_mapping(issues_by_type) 1147 1148 1149def __sort_mapping(mapping: Mapping) -> dict: 1150 return {key: mapping[key] for key in sorted(mapping.keys())} 1151 1152 1153if __name__ == "__main__": 1154 main()