Source code for suricata_check.suricata_check

  1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
  2
  3import atexit
  4import io
  5import json
  6import logging
  7import logging.handlers
  8import multiprocessing
  9import os
 10import pkgutil
 11import sys
 12from collections import defaultdict
 13from collections.abc import Mapping, Sequence
 14from functools import lru_cache
 15from typing import (
 16    Literal,
 17    Optional,
 18    Union,
 19)
 20
 21import click
 22import idstools.rule
 23import tabulate
 24
 25# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
 26_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
 27if sys.path[0] != _suricata_check_path:
 28    sys.path.insert(0, _suricata_check_path)
 29
 30from suricata_check import __version__, get_dependency_versions  # noqa: E402
 31from suricata_check.checkers.interface import CheckerInterface  # noqa: E402
 32from suricata_check.checkers.interface.dummy import DummyChecker  # noqa: E402
 33from suricata_check.utils._click import ClickHandler, ClickHelpOption  # noqa: E402
 34from suricata_check.utils._path import find_rules_file  # noqa: E402
 35from suricata_check.utils.checker import check_rule_option_recognition  # noqa: E402
 36from suricata_check.utils.checker_typing import (  # noqa: E402
 37    EXTENSIVE_SUMMARY_TYPE,
 38    ISSUES_TYPE,
 39    RULE_REPORTS_TYPE,
 40    RULE_SUMMARY_TYPE,
 41    SIMPLE_SUMMARY_TYPE,
 42    InvalidRuleError,
 43    OutputReport,
 44    OutputSummary,
 45    RuleReport,
 46    get_all_subclasses,
 47)
 48from suricata_check.utils.regex import get_regex_provider, is_valid_rule  # noqa: E402
 49
 50LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
 51LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
 52GITLAB_SEVERITIES = {
 53    logging.DEBUG: "info",
 54    logging.INFO: "info",
 55    logging.WARNING: "minor",
 56    logging.ERROR: "major",
 57    logging.CRITICAL: "critical",
 58}
 59GITHUB_SEVERITIES = {
 60    logging.DEBUG: "debug",
 61    logging.INFO: "notice",
 62    logging.WARNING: "warning",
 63    logging.ERROR: "error",
 64    logging.CRITICAL: "error",
 65}
 66GITHUB_COMMAND = (
 67    "::{level} file={file},line={line},endLine={end_line},title={title}::{message}"
 68)
 69
 70_logger = logging.getLogger(__name__)
 71
 72_regex_provider = get_regex_provider()
 73
 74# Global variable to check if extensions have already been imported in case get_checkers() is called multiple times.
 75suricata_check_extensions_imported = False
 76
 77
 78@click.command()
 79@click.option(
 80    "--rules",
 81    "-r",
 82    default=".",
 83    help="Path to Suricata rules to provide check on.",
 84    show_default=True,
 85)
 86@click.option(
 87    "--single-rule",
 88    "-s",
 89    help="A single Suricata rule to be checked",
 90    show_default=False,
 91)
 92@click.option(
 93    "--out",
 94    "-o",
 95    default=".",
 96    help="Path to suricata-check output folder.",
 97    show_default=True,
 98)
 99@click.option(
100    "--log-level",
101    default="INFO",
102    help=f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
103    show_default=True,
104)
105@click.option(
106    "--gitlab",
107    default=False,
108    help="Flag to create CodeClimate output report for GitLab CI/CD.",
109    show_default=True,
110    is_flag=True,
111)
112@click.option(
113    "--github",
114    default=False,
115    help="Flag to write workflow commands to stdout for GitHub CI/CD.",
116    show_default=True,
117    is_flag=True,
118)
119@click.option(
120    "--evaluate-disabled",
121    default=False,
122    help="Flag to evaluate disabled rules.",
123    show_default=True,
124    is_flag=True,
125)
126@click.option(
127    "--issue-severity",
128    default="INFO",
129    help=f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
130    show_default=True,
131)
132@click.option(
133    "--include-all",
134    "-a",
135    default=False,
136    help="Flag to indicate all checker codes should be enabled.",
137    show_default=True,
138    is_flag=True,
139)
140@click.option(
141    "--include",
142    "-i",
143    default=(),
144    help="List of all checker codes to enable.",
145    show_default=True,
146    multiple=True,
147)
148@click.option(
149    "--exclude",
150    "-e",
151    default=(),
152    help="List of all checker codes to disable.",
153    show_default=True,
154    multiple=True,
155)
156@click.help_option("-h", "--help", cls=ClickHelpOption)
157def main(  # noqa: PLR0913, PLR0915
158    rules: str = ".",
159    single_rule: Optional[str] = None,
160    out: str = ".",
161    log_level: LogLevel = "DEBUG",
162    gitlab: bool = False,
163    github: bool = False,
164    evaluate_disabled: bool = False,
165    issue_severity: LogLevel = "INFO",
166    include_all: bool = False,
167    include: tuple[str, ...] = (),
168    exclude: tuple[str, ...] = (),
169) -> None:
170    """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
171
172    Raises:
173      BadParameter: If provided arguments are invalid.
174
175      RuntimeError: If no checkers could be automatically discovered.
176
177    """
178    # Verify that out argument is valid
179    if os.path.exists(out) and not os.path.isdir(out):
180        raise click.BadParameter(f"Error: {out} is not a directory.")
181
182    # Verify that log_level argument is valid
183    if log_level not in LOG_LEVELS:
184        raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
185
186    # Create out directory if non-existent
187    if not os.path.exists(out):
188        os.makedirs(out)
189
190    # Setup logging from a seperate thread
191    queue = multiprocessing.Manager().Queue()
192    queue_handler = logging.handlers.QueueHandler(queue)
193
194    click_handler = ClickHandler(
195        github=github, github_level=getattr(logging, log_level)
196    )
197    logging.basicConfig(
198        level=log_level,
199        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
200        handlers=(queue_handler, click_handler),
201        force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", False) == "TRUE",
202    )
203
204    file_handler = logging.FileHandler(
205        filename=os.path.join(out, "suricata-check.log"),
206        delay=True,
207    )
208    queue_listener = logging.handlers.QueueListener(
209        queue,
210        file_handler,
211        respect_handler_level=True,
212    )
213
214    def _at_exit() -> None:
215        """Cleans up logging listener and handlers before exiting."""
216        queue_listener.enqueue_sentinel()
217        queue_listener.stop()
218        file_handler.flush()
219        file_handler.close()
220        atexit.unregister(_at_exit)
221
222    atexit.register(_at_exit)
223
224    queue_listener.start()
225
226    # Log the arguments:
227    _logger.info("Running suricata-check with the following arguments:")
228    _logger.info("out: %s", out)
229    _logger.info("rules: %s", rules)
230    _logger.info("single_rule: %s", single_rule)
231    _logger.info("log_level: %s", log_level)
232    _logger.info("gitlab: %s", gitlab)
233    _logger.info("github: %s", github)
234    _logger.info("evaluate_disabled: %s", evaluate_disabled)
235    _logger.info("issue_severity: %s", issue_severity)
236    _logger.info("include_all: %s", include_all)
237    _logger.info("include: %s", include)
238    _logger.info("exclude: %s", exclude)
239
240    # Log the environment:
241    _logger.debug("Platform: %s", sys.platform)
242    _logger.debug("Python version: %s", sys.version)
243    _logger.debug("suricata-check path: %s", _suricata_check_path)
244    _logger.debug("suricata-check version: %s", __version__)
245    for package, version in get_dependency_versions().items():
246        _logger.debug("Dependency %s version: %s", package, version)
247
248    # Verify that include and exclude arguments are valid
249    if include_all and len(include) > 0:
250        raise click.BadParameter(
251            "Error: Cannot use --include-all and --include together."
252        )
253    if include_all:
254        include = (".*",)
255
256    # Verify that issue_severity argument is valid
257    if issue_severity not in LOG_LEVELS:
258        raise click.BadParameter(
259            f"Error: {issue_severity} is not a valid issue severity or log level."
260        )
261
262    checkers = get_checkers(
263        include, exclude, issue_severity=getattr(logging, issue_severity)
264    )
265
266    if single_rule is not None:
267        __main_single_rule(out, single_rule, checkers)
268
269        # Return here so no rules file is processed.
270        _at_exit()
271        return
272
273    # Check if the rules argument is valid and find the rules file
274    rules = find_rules_file(rules)
275
276    output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
277
278    __write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
279
280    _at_exit()
281
282
283def __main_single_rule(
284    out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]]
285) -> None:
286    rule: Optional[idstools.rule.Rule] = idstools.rule.parse(single_rule)
287
288    # Verify that a rule was parsed correctly.
289    if rule is None:
290        msg = f"Error parsing rule from user input: {single_rule}"
291        _logger.critical(msg)
292        raise click.BadParameter(f"Error: {msg}")
293
294    if not is_valid_rule(rule):
295        msg = f"Error parsing rule from user input: {single_rule}"
296        _logger.critical(msg)
297        raise click.BadParameter(f"Error: {msg}")
298
299    _logger.debug("Processing rule: %s", rule["sid"])
300
301    rule_report = analyze_rule(rule, checkers=checkers)
302
303    __write_output(OutputReport(rules=[rule_report]), out)
304
305
306def __write_output(
307    output: OutputReport,
308    out: str,
309    gitlab: bool = False,
310    github: bool = False,
311    rules_file: Optional[str] = None,
312) -> None:
313    _logger.info(
314        "Writing output to suricata-check.jsonl and suricata-check-fast.log in %s",
315        os.path.abspath(out),
316    )
317    with (
318        open(
319            os.path.join(out, "suricata-check.jsonl"),
320            "w",
321            buffering=io.DEFAULT_BUFFER_SIZE,
322        ) as jsonl_fh,
323        open(
324            os.path.join(out, "suricata-check-fast.log"),
325            "w",
326            buffering=io.DEFAULT_BUFFER_SIZE,
327        ) as fast_fh,
328    ):
329        rules: RULE_REPORTS_TYPE = output.rules
330        jsonl_fh.write("\n".join([str(rule) for rule in rules]))
331
332        for rule_report in rules:
333            rule: idstools.rule.Rule = rule_report.rule
334            lines: str = (
335                "{}-{}".format(rule_report.line_begin, rule_report.line_end)
336                if rule_report.line_begin
337                else "Unknown"
338            )
339            issues: ISSUES_TYPE = rule_report.issues
340            for issue in issues:
341                code = issue.code
342                severity = (
343                    logging.getLevelName(issue.severity) if issue.severity else None
344                )
345                issue_msg = issue.message.replace("\n", " ")
346
347                msg = "[{}]{} Lines {}, sid {}: {}".format(
348                    code,
349                    f" ({severity})" if severity else "",
350                    lines,
351                    rule["sid"],
352                    issue_msg,
353                )
354                fast_fh.write(msg + "\n")
355                click.secho(msg, color=True, fg="blue")
356
357    if output.summary is not None:
358        __write_output_stats(output, out)
359
360    if gitlab:
361        assert rules_file is not None
362
363        __write_output_gitlab(output, out, rules_file)
364
365    if github:
366        assert rules_file is not None
367
368        __write_output_github(output, rules_file)
369
370
371def __write_output_stats(output: OutputReport, out: str) -> None:
372    assert output.summary is not None
373
374    with open(
375        os.path.join(out, "suricata-check-stats.log"),
376        "w",
377        buffering=io.DEFAULT_BUFFER_SIZE,
378    ) as stats_fh:
379        summary: OutputSummary = output.summary
380
381        overall_summary: SIMPLE_SUMMARY_TYPE = summary.overall_summary
382
383        n_issues = overall_summary["Total Issues"]
384        n_rules = (
385            overall_summary["Rules with Issues"]
386            + overall_summary["Rules without Issues"]
387        )
388
389        stats_fh.write(
390            tabulate.tabulate(
391                (
392                    (
393                        k,
394                        v,
395                        (
396                            "{:.0%}".format(v / n_rules)
397                            if k.startswith("Rules ") and n_rules > 0
398                            else "-"
399                        ),
400                    )
401                    for k, v in overall_summary.items()
402                ),
403                headers=(
404                    "Count",
405                    "Percentage of Rules",
406                ),
407            )
408            + "\n\n",
409        )
410
411        click.secho(
412            f"Total issues found: {overall_summary['Total Issues']}",
413            color=True,
414            bold=True,
415            fg="blue",
416        )
417        click.secho(
418            f"Rules with Issues found: {overall_summary['Rules with Issues']}",
419            color=True,
420            bold=True,
421            fg="blue",
422        )
423
424        issues_by_group: SIMPLE_SUMMARY_TYPE = summary.issues_by_group
425
426        stats_fh.write(
427            tabulate.tabulate(
428                (
429                    (k, v, "{:.0%}".format(v / n_issues) if n_issues > 0 else "-")
430                    for k, v in issues_by_group.items()
431                ),
432                headers=(
433                    "Count",
434                    "Percentage of Total Issues",
435                ),
436            )
437            + "\n\n",
438        )
439
440        issues_by_type: EXTENSIVE_SUMMARY_TYPE = summary.issues_by_type
441        for checker, checker_issues_by_type in issues_by_type.items():
442            stats_fh.write(" " + checker + " " + "\n")
443            stats_fh.write("-" * (len(checker) + 2) + "\n")
444            stats_fh.write(
445                tabulate.tabulate(
446                    (
447                        (
448                            k,
449                            v,
450                            "{:.0%}".format(v / n_rules) if n_rules > 0 else "-",
451                        )
452                        for k, v in checker_issues_by_type.items()
453                    ),
454                    headers=(
455                        "Count",
456                        "Percentage of Rules",
457                    ),
458                )
459                + "\n\n",
460            )
461
462
463def __write_output_gitlab(output: OutputReport, out: str, rules_file: str) -> None:
464    with open(
465        os.path.join(out, "suricata-check-gitlab.json"),
466        "w",
467        buffering=io.DEFAULT_BUFFER_SIZE,
468    ) as gitlab_fh:
469        issue_dicts = []
470        for rule_report in output.rules:
471            line_begin: Optional[int] = rule_report.line_begin
472            assert line_begin is not None
473            line_end: Optional[int] = rule_report.line_end
474            assert line_end is not None
475            issues: ISSUES_TYPE = rule_report.issues
476            for issue in issues:
477                code = issue.code
478                issue_msg = issue.message.replace("\n", " ")
479                assert issue.checker is not None
480                issue_checker = issue.checker
481                issue_hash = str(issue.hash)
482                assert issue.severity is not None
483                issue_severity = GITLAB_SEVERITIES[issue.severity]
484
485                issue_dict: Mapping[
486                    str,
487                    Union[str, list[str], Mapping[str, Union[str, Mapping[str, int]]]],
488                ] = {
489                    "description": issue_msg,
490                    "categories": [issue_checker],
491                    "check_name": f"Suricata Check {code}",
492                    "fingerprint": issue_hash,
493                    "severity": issue_severity,
494                    "location": {
495                        "path": rules_file,
496                        "lines": {"begin": line_begin, "end": line_end},
497                    },
498                }
499                issue_dicts.append(issue_dict)
500
501        gitlab_fh.write(json.dumps(issue_dicts))
502
503
504def __write_output_github(output: OutputReport, rules_file: str) -> None:
505    output_lines: dict[str, list[str]] = {
506        k: [] for k in set(GITHUB_SEVERITIES.values())
507    }
508    for rule_report in output.rules:
509        line_begin: Optional[int] = rule_report.line_begin
510        assert line_begin is not None
511        line_end: Optional[int] = rule_report.line_end
512        assert line_end is not None
513        issues: ISSUES_TYPE = rule_report.issues
514        for issue in issues:
515            code = issue.code
516            issue_msg = issue.message.replace("\n", " ")
517            assert issue.checker is not None
518            issue_checker = issue.checker
519            assert issue.severity is not None
520            issue_severity = GITHUB_SEVERITIES[issue.severity]
521            title = f"{issue_checker} - {code}"
522
523            output_lines[issue_severity].append(
524                GITHUB_COMMAND.format(
525                    level=issue_severity,
526                    file=rules_file,
527                    line=line_begin,
528                    end_line=line_end,
529                    title=title,
530                    message=issue_msg,
531                )
532            )
533
534    for message_level, lines in output_lines.items():
535        if len(lines) > 0:
536            print(f"::group::{message_level}")  # noqa: T201
537            for message in lines:
538                print(message)  # noqa: T201
539            print("::endgroup::")  # noqa: T201
540
541
[docs] 542def process_rules_file( # noqa: C901, PLR0912 543 rules: str, 544 evaluate_disabled: bool, 545 checkers: Optional[Sequence[CheckerInterface]] = None, 546) -> OutputReport: 547 """Processes a rule file and returns a list of rules and their issues. 548 549 Args: 550 rules: A path to a Suricata rules file. 551 evaluate_disabled: A flag indicating whether disabled rules should be evaluated. 552 checkers: The checkers to be used when processing the rule file. 553 554 Returns: 555 A list of rules and their issues. 556 557 Raises: 558 RuntimeError: If no checkers could be automatically discovered. 559 560 """ 561 if checkers is None: 562 checkers = get_checkers() 563 564 output = OutputReport() 565 566 with ( 567 open( 568 os.path.normpath(rules), 569 buffering=io.DEFAULT_BUFFER_SIZE, 570 ) as rules_fh, 571 ): 572 if len(checkers) == 0: 573 msg = "No checkers provided for processing rules." 574 _logger.error(msg) 575 raise RuntimeError(msg) 576 577 _logger.info("Processing rule file: %s", rules) 578 579 collected_multiline_parts: Optional[str] = None 580 multiline_begin_number: Optional[int] = None 581 582 for number, line in enumerate(rules_fh.readlines(), start=1): 583 # First work on collecting and parsing multiline rules 584 if line.rstrip("\r\n").endswith("\\"): 585 multiline_part = line.rstrip("\r\n")[:-1] 586 587 if collected_multiline_parts is None: 588 collected_multiline_parts = multiline_part 589 multiline_begin_number = number 590 else: 591 collected_multiline_parts += multiline_part.lstrip() 592 593 continue 594 595 # Process final part of multiline rule if one is being collected 596 if collected_multiline_parts is not None: 597 collected_multiline_parts += line.lstrip() 598 599 rule: Optional[idstools.rule.Rule] = idstools.rule.parse( 600 collected_multiline_parts 601 ) 602 collected_multiline_parts = None 603 # If no multiline rule is being collected process as a potential single line rule 604 else: 605 if len(line.strip()) == 0: 606 continue 607 608 if line.startswith("#"): 609 if evaluate_disabled: 610 # Verify that this line is a rule and not a comment 611 if idstools.rule.parse(line) is None: 612 # Log the comment since it may be a invalid rule 613 _logger.warning( 614 "Ignoring comment on line %i: %s", number, line 615 ) 616 continue 617 else: 618 # Skip the rule 619 continue 620 621 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(line) 622 623 # Verify that a rule was parsed correctly. 624 if rule is None: 625 _logger.error("Error parsing rule on line %i: %s", number, line) 626 continue 627 628 if not is_valid_rule(rule): 629 _logger.error("Invalid rule on line %i: %s", number, line) 630 continue 631 632 _logger.debug("Processing rule: %s on line %i", rule["sid"], number) 633 634 rule_report: RuleReport = analyze_rule( 635 rule, 636 checkers=checkers, 637 ) 638 rule_report.line_begin = multiline_begin_number or number 639 rule_report.line_end = number 640 output.rules.append(rule_report) 641 642 multiline_begin_number = None 643 644 _logger.info("Completed processing rule file: %s", rules) 645 646 output.summary = __summarize_output(output, checkers) 647 648 return output
649 650 651def _import_extensions() -> None: 652 global suricata_check_extensions_imported # noqa: PLW0603 653 if suricata_check_extensions_imported is True: 654 return 655 656 for module in pkgutil.iter_modules(): 657 if module.name.startswith("suricata_check_"): 658 try: 659 imported_module = __import__(module.name) 660 _logger.info( 661 "Detected and successfully imported suricata-check extension %s with version %s.", 662 module.name.replace("_", "-"), 663 getattr(imported_module, "__version__"), 664 ) 665 except ImportError: 666 _logger.warning( 667 "Detected potential suricata-check extension %s but failed to import it.", 668 module.name.replace("_", "-"), 669 ) 670 suricata_check_extensions_imported = True 671 672
[docs] 673@lru_cache(maxsize=1) 674def get_checkers( 675 include: Sequence[str] = (".*",), 676 exclude: Sequence[str] = (), 677 issue_severity: int = logging.INFO, 678) -> Sequence[CheckerInterface]: 679 """Auto discovers all available checkers that implement the CheckerInterface. 680 681 Returns: 682 A list of available checkers that implement the CheckerInterface. 683 684 """ 685 # Check for extensions and try to import them 686 _import_extensions() 687 688 checkers: list[CheckerInterface] = [] 689 for checker in get_all_subclasses(CheckerInterface): 690 if checker.__name__ == DummyChecker.__name__: 691 continue 692 693 # Initialize DummyCheckers to retrieve error messages. 694 if issubclass(checker, DummyChecker): 695 checker() 696 697 enabled, relevant_codes = __get_checker_enabled( 698 checker, include, exclude, issue_severity 699 ) 700 701 if enabled: 702 checkers.append(checker(include=relevant_codes)) 703 704 else: 705 _logger.info("Checker %s is disabled.", checker.__name__) 706 707 _logger.info( 708 "Discovered and enabled checkers: [%s]", 709 ", ".join([c.__class__.__name__ for c in checkers]), 710 ) 711 if len(checkers) == 0: 712 _logger.warning( 713 "No checkers were enabled. Check the include and exclude arguments." 714 ) 715 716 # Perform a uniqueness check on the codes emmitted by the checkers 717 for checker1 in checkers: 718 for checker2 in checkers: 719 if checker1 == checker2: 720 continue 721 if not set(checker1.codes).isdisjoint(checker2.codes): 722 msg = f"Checker {checker1.__class__.__name__} and {checker2.__class__.__name__} have overlapping codes." 723 _logger.error(msg) 724 725 return sorted(checkers, key=lambda x: x.__class__.__name__)
726 727 728def __get_checker_enabled( 729 checker: type[CheckerInterface], 730 include: Sequence[str], 731 exclude: Sequence[str], 732 issue_severity: int, 733) -> tuple[bool, set[str]]: 734 enabled = checker.enabled_by_default 735 736 # If no include regexes are provided, include all by default 737 if len(include) == 0: 738 relevant_codes = set(checker.codes.keys()) 739 else: 740 # If include regexes are provided, include all codes that match any of these regexes 741 relevant_codes = set() 742 743 for regex in include: 744 relevant_codes.update( 745 set( 746 filter( 747 lambda code: _regex_provider.compile("^" + regex + "$").match( 748 code 749 ) 750 is not None, 751 checker.codes.keys(), 752 ) 753 ) 754 ) 755 756 if len(relevant_codes) > 0: 757 enabled = True 758 759 # Now remove the codes that are excluded according to any of the provided exclude regexes 760 for regex in exclude: 761 relevant_codes = set( 762 filter( 763 lambda code: _regex_provider.compile("^" + regex + "$").match(code) 764 is None, 765 relevant_codes, 766 ) 767 ) 768 769 # Now filter out irrelevant codes based on severity 770 relevant_codes = set( 771 filter( 772 lambda code: checker.codes[code]["severity"] >= issue_severity, 773 relevant_codes, 774 ) 775 ) 776 777 if len(relevant_codes) == 0: 778 enabled = False 779 780 return enabled, relevant_codes 781 782
[docs] 783def analyze_rule( 784 rule: idstools.rule.Rule, 785 checkers: Optional[Sequence[CheckerInterface]] = None, 786) -> RuleReport: 787 """Checks a rule and returns a dictionary containing the rule and a list of issues found. 788 789 Args: 790 rule: The rule to be checked. 791 checkers: The checkers to be used to check the rule. 792 793 Returns: 794 A list of issues found in the rule. 795 Each issue is typed as a `dict`. 796 797 Raises: 798 InvalidRuleError: If the rule does not follow the Suricata syntax. 799 800 """ 801 if not is_valid_rule(rule): 802 raise InvalidRuleError(rule["raw"]) 803 804 check_rule_option_recognition(rule) 805 806 if checkers is None: 807 checkers = get_checkers() 808 809 rule_report: RuleReport = RuleReport(rule=rule) 810 811 for checker in checkers: 812 try: 813 rule_report.add_issues(checker.check_rule(rule)) 814 except Exception as exception: # noqa: BLE001 815 _logger.warning( 816 "Failed to run %s on rule: %s", 817 checker.__class__.__name__, 818 rule["raw"], 819 extra={"exception": exception}, 820 ) 821 822 rule_report.summary = __summarize_rule(rule_report, checkers) 823 824 return rule_report
825 826 827def __summarize_rule( 828 rule: RuleReport, 829 checkers: Optional[Sequence[CheckerInterface]] = None, 830) -> RULE_SUMMARY_TYPE: 831 """Summarizes the issues found in a rule. 832 833 Args: 834 rule: The rule output dictionary to be summarized. 835 checkers: The checkers to be used to check the rule. 836 837 Returns: 838 A dictionary containing a summary of all issues found in the rule. 839 840 """ 841 if checkers is None: 842 checkers = get_checkers() 843 844 summary = {} 845 846 issues: ISSUES_TYPE = rule.issues 847 summary["total_issues"] = len(issues) 848 summary["issues_by_group"] = defaultdict(int) 849 for issue in issues: 850 checker = issue.checker 851 summary["issues_by_group"][checker] += 1 852 853 # Ensure also checkers without issues are included in the report. 854 for checker in checkers: 855 if checker.__class__.__name__ not in summary["issues_by_group"]: 856 summary["issues_by_group"][checker.__class__.__name__] = 0 857 858 # Sort dictionaries for deterministic output 859 summary["issues_by_group"] = __sort_mapping(summary["issues_by_group"]) 860 861 return summary 862 863 864def __summarize_output( 865 output: OutputReport, 866 checkers: Optional[Sequence[CheckerInterface]] = None, 867) -> OutputSummary: 868 """Summarizes the issues found in a rules file. 869 870 Args: 871 output: The unsammarized output of the rules file containing all rules and their issues. 872 checkers: The checkers to be used to check the rule. 873 874 Returns: 875 A dictionary containing a summary of all issues found in the rules file. 876 877 """ 878 if checkers is None: 879 checkers = get_checkers() 880 881 return OutputSummary( 882 overall_summary=__get_overall_summary(output), 883 issues_by_group=__get_issues_by_group(output, checkers), 884 issues_by_type=__get_issues_by_type(output, checkers), 885 ) 886 887 888def __get_overall_summary( 889 output: OutputReport, 890) -> SIMPLE_SUMMARY_TYPE: 891 overall_summary = { 892 "Total Issues": 0, 893 "Rules with Issues": 0, 894 "Rules without Issues": 0, 895 } 896 897 rules: RULE_REPORTS_TYPE = output.rules 898 for rule in rules: 899 issues: ISSUES_TYPE = rule.issues 900 overall_summary["Total Issues"] += len(issues) 901 902 if len(issues) == 0: 903 overall_summary["Rules without Issues"] += 1 904 else: 905 overall_summary["Rules with Issues"] += 1 906 907 return overall_summary 908 909 910def __get_issues_by_group( 911 output: OutputReport, 912 checkers: Optional[Sequence[CheckerInterface]] = None, 913) -> SIMPLE_SUMMARY_TYPE: 914 if checkers is None: 915 checkers = get_checkers() 916 917 issues_by_group = defaultdict(int) 918 919 # Ensure also checkers and codes without issues are included in the report. 920 for checker in checkers: 921 issues_by_group[checker.__class__.__name__] = 0 922 923 rules: RULE_REPORTS_TYPE = output.rules 924 for rule in rules: 925 issues: ISSUES_TYPE = rule.issues 926 927 for issue in issues: 928 checker = issue.checker 929 if checker is not None: 930 issues_by_group[checker] += 1 931 932 return __sort_mapping(issues_by_group) 933 934 935def __get_issues_by_type( 936 output: OutputReport, 937 checkers: Optional[Sequence[CheckerInterface]] = None, 938) -> EXTENSIVE_SUMMARY_TYPE: 939 if checkers is None: 940 checkers = get_checkers() 941 issues_by_type: EXTENSIVE_SUMMARY_TYPE = defaultdict(lambda: defaultdict(int)) 942 943 # Ensure also checkers and codes without issues are included in the report. 944 for checker in checkers: 945 for code in checker.codes: 946 issues_by_type[checker.__class__.__name__][code] = 0 947 948 rules: RULE_REPORTS_TYPE = output.rules 949 for rule in rules: 950 issues: ISSUES_TYPE = rule.issues 951 952 checker_codes = defaultdict(lambda: defaultdict(int)) 953 for issue in issues: 954 checker = issue.checker 955 if checker is not None: 956 code = issue.code 957 checker_codes[checker][code] += 1 958 959 for checker, codes in checker_codes.items(): 960 for code, count in codes.items(): 961 issues_by_type[checker][code] += count 962 963 for key in issues_by_type: 964 issues_by_type[key] = __sort_mapping(issues_by_type[key]) 965 966 return __sort_mapping(issues_by_type) 967 968 969def __sort_mapping(mapping: Mapping) -> dict: 970 return {key: mapping[key] for key in sorted(mapping.keys())} 971 972 973if __name__ == "__main__": 974 main()