1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
2
3import atexit
4import io
5import json
6import logging
7import logging.handlers
8import multiprocessing
9import os
10import pkgutil
11import sys
12from collections import defaultdict
13from collections.abc import Mapping, Sequence
14from functools import lru_cache
15from typing import (
16 Literal,
17 Optional,
18 Union,
19)
20
21import click
22import idstools.rule
23import tabulate
24
25# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
26_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
27if sys.path[0] != _suricata_check_path:
28 sys.path.insert(0, _suricata_check_path)
29
30from suricata_check import __version__, get_dependency_versions # noqa: E402
31from suricata_check.checkers.interface import CheckerInterface # noqa: E402
32from suricata_check.checkers.interface.dummy import DummyChecker # noqa: E402
33from suricata_check.utils._click import ClickHandler, ClickHelpOption # noqa: E402
34from suricata_check.utils._path import find_rules_file # noqa: E402
35from suricata_check.utils.checker import check_rule_option_recognition # noqa: E402
36from suricata_check.utils.checker_typing import ( # noqa: E402
37 EXTENSIVE_SUMMARY_TYPE,
38 ISSUES_TYPE,
39 RULE_REPORTS_TYPE,
40 RULE_SUMMARY_TYPE,
41 SIMPLE_SUMMARY_TYPE,
42 InvalidRuleError,
43 OutputReport,
44 OutputSummary,
45 RuleReport,
46 get_all_subclasses,
47)
48from suricata_check.utils.regex import get_regex_provider, is_valid_rule # noqa: E402
49
50LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
51LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
52GITLAB_SEVERITIES = {
53 logging.DEBUG: "info",
54 logging.INFO: "info",
55 logging.WARNING: "minor",
56 logging.ERROR: "major",
57 logging.CRITICAL: "critical",
58}
59GITHUB_SEVERITIES = {
60 logging.DEBUG: "debug",
61 logging.INFO: "notice",
62 logging.WARNING: "warning",
63 logging.ERROR: "error",
64 logging.CRITICAL: "error",
65}
66GITHUB_COMMAND = (
67 "::{level} file={file},line={line},endLine={end_line},title={title}::{message}"
68)
69
70_logger = logging.getLogger(__name__)
71
72_regex_provider = get_regex_provider()
73
74# Global variable to check if extensions have already been imported in case get_checkers() is called multiple times.
75suricata_check_extensions_imported = False
76
77
78@click.command()
79@click.option(
80 "--rules",
81 "-r",
82 default=".",
83 help="Path to Suricata rules to provide check on.",
84 show_default=True,
85)
86@click.option(
87 "--single-rule",
88 "-s",
89 help="A single Suricata rule to be checked",
90 show_default=False,
91)
92@click.option(
93 "--out",
94 "-o",
95 default=".",
96 help="Path to suricata-check output folder.",
97 show_default=True,
98)
99@click.option(
100 "--log-level",
101 default="INFO",
102 help=f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
103 show_default=True,
104)
105@click.option(
106 "--gitlab",
107 default=False,
108 help="Flag to create CodeClimate output report for GitLab CI/CD.",
109 show_default=True,
110 is_flag=True,
111)
112@click.option(
113 "--github",
114 default=False,
115 help="Flag to write workflow commands to stdout for GitHub CI/CD.",
116 show_default=True,
117 is_flag=True,
118)
119@click.option(
120 "--evaluate-disabled",
121 default=False,
122 help="Flag to evaluate disabled rules.",
123 show_default=True,
124 is_flag=True,
125)
126@click.option(
127 "--issue-severity",
128 default="INFO",
129 help=f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
130 show_default=True,
131)
132@click.option(
133 "--include-all",
134 "-a",
135 default=False,
136 help="Flag to indicate all checker codes should be enabled.",
137 show_default=True,
138 is_flag=True,
139)
140@click.option(
141 "--include",
142 "-i",
143 default=(),
144 help="List of all checker codes to enable.",
145 show_default=True,
146 multiple=True,
147)
148@click.option(
149 "--exclude",
150 "-e",
151 default=(),
152 help="List of all checker codes to disable.",
153 show_default=True,
154 multiple=True,
155)
156@click.help_option("-h", "--help", cls=ClickHelpOption)
157def main( # noqa: PLR0913, PLR0915
158 rules: str = ".",
159 single_rule: Optional[str] = None,
160 out: str = ".",
161 log_level: LogLevel = "DEBUG",
162 gitlab: bool = False,
163 github: bool = False,
164 evaluate_disabled: bool = False,
165 issue_severity: LogLevel = "INFO",
166 include_all: bool = False,
167 include: tuple[str, ...] = (),
168 exclude: tuple[str, ...] = (),
169) -> None:
170 """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
171
172 Raises:
173 BadParameter: If provided arguments are invalid.
174
175 RuntimeError: If no checkers could be automatically discovered.
176
177 """
178 # Verify that out argument is valid
179 if os.path.exists(out) and not os.path.isdir(out):
180 raise click.BadParameter(f"Error: {out} is not a directory.")
181
182 # Verify that log_level argument is valid
183 if log_level not in LOG_LEVELS:
184 raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
185
186 # Create out directory if non-existent
187 if not os.path.exists(out):
188 os.makedirs(out)
189
190 # Setup logging from a seperate thread
191 queue = multiprocessing.Manager().Queue()
192 queue_handler = logging.handlers.QueueHandler(queue)
193
194 click_handler = ClickHandler(
195 github=github, github_level=getattr(logging, log_level)
196 )
197 logging.basicConfig(
198 level=log_level,
199 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
200 handlers=(queue_handler, click_handler),
201 force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", False) == "TRUE",
202 )
203
204 file_handler = logging.FileHandler(
205 filename=os.path.join(out, "suricata-check.log"),
206 delay=True,
207 )
208 queue_listener = logging.handlers.QueueListener(
209 queue,
210 file_handler,
211 respect_handler_level=True,
212 )
213
214 def _at_exit() -> None:
215 """Cleans up logging listener and handlers before exiting."""
216 queue_listener.enqueue_sentinel()
217 queue_listener.stop()
218 file_handler.flush()
219 file_handler.close()
220 atexit.unregister(_at_exit)
221
222 atexit.register(_at_exit)
223
224 queue_listener.start()
225
226 # Log the arguments:
227 _logger.info("Running suricata-check with the following arguments:")
228 _logger.info("out: %s", out)
229 _logger.info("rules: %s", rules)
230 _logger.info("single_rule: %s", single_rule)
231 _logger.info("log_level: %s", log_level)
232 _logger.info("gitlab: %s", gitlab)
233 _logger.info("github: %s", github)
234 _logger.info("evaluate_disabled: %s", evaluate_disabled)
235 _logger.info("issue_severity: %s", issue_severity)
236 _logger.info("include_all: %s", include_all)
237 _logger.info("include: %s", include)
238 _logger.info("exclude: %s", exclude)
239
240 # Log the environment:
241 _logger.debug("Platform: %s", sys.platform)
242 _logger.debug("Python version: %s", sys.version)
243 _logger.debug("suricata-check path: %s", _suricata_check_path)
244 _logger.debug("suricata-check version: %s", __version__)
245 for package, version in get_dependency_versions().items():
246 _logger.debug("Dependency %s version: %s", package, version)
247
248 # Verify that include and exclude arguments are valid
249 if include_all and len(include) > 0:
250 raise click.BadParameter(
251 "Error: Cannot use --include-all and --include together."
252 )
253 if include_all:
254 include = (".*",)
255
256 # Verify that issue_severity argument is valid
257 if issue_severity not in LOG_LEVELS:
258 raise click.BadParameter(
259 f"Error: {issue_severity} is not a valid issue severity or log level."
260 )
261
262 checkers = get_checkers(
263 include, exclude, issue_severity=getattr(logging, issue_severity)
264 )
265
266 if single_rule is not None:
267 __main_single_rule(out, single_rule, checkers)
268
269 # Return here so no rules file is processed.
270 _at_exit()
271 return
272
273 # Check if the rules argument is valid and find the rules file
274 rules = find_rules_file(rules)
275
276 output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
277
278 __write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
279
280 _at_exit()
281
282
283def __main_single_rule(
284 out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]]
285) -> None:
286 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(single_rule)
287
288 # Verify that a rule was parsed correctly.
289 if rule is None:
290 msg = f"Error parsing rule from user input: {single_rule}"
291 _logger.critical(msg)
292 raise click.BadParameter(f"Error: {msg}")
293
294 if not is_valid_rule(rule):
295 msg = f"Error parsing rule from user input: {single_rule}"
296 _logger.critical(msg)
297 raise click.BadParameter(f"Error: {msg}")
298
299 _logger.debug("Processing rule: %s", rule["sid"])
300
301 rule_report = analyze_rule(rule, checkers=checkers)
302
303 __write_output(OutputReport(rules=[rule_report]), out)
304
305
306def __write_output(
307 output: OutputReport,
308 out: str,
309 gitlab: bool = False,
310 github: bool = False,
311 rules_file: Optional[str] = None,
312) -> None:
313 _logger.info(
314 "Writing output to suricata-check.jsonl and suricata-check-fast.log in %s",
315 os.path.abspath(out),
316 )
317 with (
318 open(
319 os.path.join(out, "suricata-check.jsonl"),
320 "w",
321 buffering=io.DEFAULT_BUFFER_SIZE,
322 ) as jsonl_fh,
323 open(
324 os.path.join(out, "suricata-check-fast.log"),
325 "w",
326 buffering=io.DEFAULT_BUFFER_SIZE,
327 ) as fast_fh,
328 ):
329 rules: RULE_REPORTS_TYPE = output.rules
330 jsonl_fh.write("\n".join([str(rule) for rule in rules]))
331
332 for rule_report in rules:
333 rule: idstools.rule.Rule = rule_report.rule
334 lines: str = (
335 "{}-{}".format(rule_report.line_begin, rule_report.line_end)
336 if rule_report.line_begin
337 else "Unknown"
338 )
339 issues: ISSUES_TYPE = rule_report.issues
340 for issue in issues:
341 code = issue.code
342 severity = (
343 logging.getLevelName(issue.severity) if issue.severity else None
344 )
345 issue_msg = issue.message.replace("\n", " ")
346
347 msg = "[{}]{} Lines {}, sid {}: {}".format(
348 code,
349 f" ({severity})" if severity else "",
350 lines,
351 rule["sid"],
352 issue_msg,
353 )
354 fast_fh.write(msg + "\n")
355 click.secho(msg, color=True, fg="blue")
356
357 if output.summary is not None:
358 __write_output_stats(output, out)
359
360 if gitlab:
361 assert rules_file is not None
362
363 __write_output_gitlab(output, out, rules_file)
364
365 if github:
366 assert rules_file is not None
367
368 __write_output_github(output, rules_file)
369
370
371def __write_output_stats(output: OutputReport, out: str) -> None:
372 assert output.summary is not None
373
374 with open(
375 os.path.join(out, "suricata-check-stats.log"),
376 "w",
377 buffering=io.DEFAULT_BUFFER_SIZE,
378 ) as stats_fh:
379 summary: OutputSummary = output.summary
380
381 overall_summary: SIMPLE_SUMMARY_TYPE = summary.overall_summary
382
383 n_issues = overall_summary["Total Issues"]
384 n_rules = (
385 overall_summary["Rules with Issues"]
386 + overall_summary["Rules without Issues"]
387 )
388
389 stats_fh.write(
390 tabulate.tabulate(
391 (
392 (
393 k,
394 v,
395 (
396 "{:.0%}".format(v / n_rules)
397 if k.startswith("Rules ") and n_rules > 0
398 else "-"
399 ),
400 )
401 for k, v in overall_summary.items()
402 ),
403 headers=(
404 "Count",
405 "Percentage of Rules",
406 ),
407 )
408 + "\n\n",
409 )
410
411 click.secho(
412 f"Total issues found: {overall_summary['Total Issues']}",
413 color=True,
414 bold=True,
415 fg="blue",
416 )
417 click.secho(
418 f"Rules with Issues found: {overall_summary['Rules with Issues']}",
419 color=True,
420 bold=True,
421 fg="blue",
422 )
423
424 issues_by_group: SIMPLE_SUMMARY_TYPE = summary.issues_by_group
425
426 stats_fh.write(
427 tabulate.tabulate(
428 (
429 (k, v, "{:.0%}".format(v / n_issues) if n_issues > 0 else "-")
430 for k, v in issues_by_group.items()
431 ),
432 headers=(
433 "Count",
434 "Percentage of Total Issues",
435 ),
436 )
437 + "\n\n",
438 )
439
440 issues_by_type: EXTENSIVE_SUMMARY_TYPE = summary.issues_by_type
441 for checker, checker_issues_by_type in issues_by_type.items():
442 stats_fh.write(" " + checker + " " + "\n")
443 stats_fh.write("-" * (len(checker) + 2) + "\n")
444 stats_fh.write(
445 tabulate.tabulate(
446 (
447 (
448 k,
449 v,
450 "{:.0%}".format(v / n_rules) if n_rules > 0 else "-",
451 )
452 for k, v in checker_issues_by_type.items()
453 ),
454 headers=(
455 "Count",
456 "Percentage of Rules",
457 ),
458 )
459 + "\n\n",
460 )
461
462
463def __write_output_gitlab(output: OutputReport, out: str, rules_file: str) -> None:
464 with open(
465 os.path.join(out, "suricata-check-gitlab.json"),
466 "w",
467 buffering=io.DEFAULT_BUFFER_SIZE,
468 ) as gitlab_fh:
469 issue_dicts = []
470 for rule_report in output.rules:
471 line_begin: Optional[int] = rule_report.line_begin
472 assert line_begin is not None
473 line_end: Optional[int] = rule_report.line_end
474 assert line_end is not None
475 issues: ISSUES_TYPE = rule_report.issues
476 for issue in issues:
477 code = issue.code
478 issue_msg = issue.message.replace("\n", " ")
479 assert issue.checker is not None
480 issue_checker = issue.checker
481 issue_hash = str(issue.hash)
482 assert issue.severity is not None
483 issue_severity = GITLAB_SEVERITIES[issue.severity]
484
485 issue_dict: Mapping[
486 str,
487 Union[str, list[str], Mapping[str, Union[str, Mapping[str, int]]]],
488 ] = {
489 "description": issue_msg,
490 "categories": [issue_checker],
491 "check_name": f"Suricata Check {code}",
492 "fingerprint": issue_hash,
493 "severity": issue_severity,
494 "location": {
495 "path": rules_file,
496 "lines": {"begin": line_begin, "end": line_end},
497 },
498 }
499 issue_dicts.append(issue_dict)
500
501 gitlab_fh.write(json.dumps(issue_dicts))
502
503
504def __write_output_github(output: OutputReport, rules_file: str) -> None:
505 output_lines: dict[str, list[str]] = {
506 k: [] for k in set(GITHUB_SEVERITIES.values())
507 }
508 for rule_report in output.rules:
509 line_begin: Optional[int] = rule_report.line_begin
510 assert line_begin is not None
511 line_end: Optional[int] = rule_report.line_end
512 assert line_end is not None
513 issues: ISSUES_TYPE = rule_report.issues
514 for issue in issues:
515 code = issue.code
516 issue_msg = issue.message.replace("\n", " ")
517 assert issue.checker is not None
518 issue_checker = issue.checker
519 assert issue.severity is not None
520 issue_severity = GITHUB_SEVERITIES[issue.severity]
521 title = f"{issue_checker} - {code}"
522
523 output_lines[issue_severity].append(
524 GITHUB_COMMAND.format(
525 level=issue_severity,
526 file=rules_file,
527 line=line_begin,
528 end_line=line_end,
529 title=title,
530 message=issue_msg,
531 )
532 )
533
534 for message_level, lines in output_lines.items():
535 if len(lines) > 0:
536 print(f"::group::{message_level}") # noqa: T201
537 for message in lines:
538 print(message) # noqa: T201
539 print("::endgroup::") # noqa: T201
540
541
[docs]
542def process_rules_file( # noqa: C901, PLR0912
543 rules: str,
544 evaluate_disabled: bool,
545 checkers: Optional[Sequence[CheckerInterface]] = None,
546) -> OutputReport:
547 """Processes a rule file and returns a list of rules and their issues.
548
549 Args:
550 rules: A path to a Suricata rules file.
551 evaluate_disabled: A flag indicating whether disabled rules should be evaluated.
552 checkers: The checkers to be used when processing the rule file.
553
554 Returns:
555 A list of rules and their issues.
556
557 Raises:
558 RuntimeError: If no checkers could be automatically discovered.
559
560 """
561 if checkers is None:
562 checkers = get_checkers()
563
564 output = OutputReport()
565
566 with (
567 open(
568 os.path.normpath(rules),
569 buffering=io.DEFAULT_BUFFER_SIZE,
570 ) as rules_fh,
571 ):
572 if len(checkers) == 0:
573 msg = "No checkers provided for processing rules."
574 _logger.error(msg)
575 raise RuntimeError(msg)
576
577 _logger.info("Processing rule file: %s", rules)
578
579 collected_multiline_parts: Optional[str] = None
580 multiline_begin_number: Optional[int] = None
581
582 for number, line in enumerate(rules_fh.readlines(), start=1):
583 # First work on collecting and parsing multiline rules
584 if line.rstrip("\r\n").endswith("\\"):
585 multiline_part = line.rstrip("\r\n")[:-1]
586
587 if collected_multiline_parts is None:
588 collected_multiline_parts = multiline_part
589 multiline_begin_number = number
590 else:
591 collected_multiline_parts += multiline_part.lstrip()
592
593 continue
594
595 # Process final part of multiline rule if one is being collected
596 if collected_multiline_parts is not None:
597 collected_multiline_parts += line.lstrip()
598
599 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(
600 collected_multiline_parts
601 )
602 collected_multiline_parts = None
603 # If no multiline rule is being collected process as a potential single line rule
604 else:
605 if len(line.strip()) == 0:
606 continue
607
608 if line.startswith("#"):
609 if evaluate_disabled:
610 # Verify that this line is a rule and not a comment
611 if idstools.rule.parse(line) is None:
612 # Log the comment since it may be a invalid rule
613 _logger.warning(
614 "Ignoring comment on line %i: %s", number, line
615 )
616 continue
617 else:
618 # Skip the rule
619 continue
620
621 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(line)
622
623 # Verify that a rule was parsed correctly.
624 if rule is None:
625 _logger.error("Error parsing rule on line %i: %s", number, line)
626 continue
627
628 if not is_valid_rule(rule):
629 _logger.error("Invalid rule on line %i: %s", number, line)
630 continue
631
632 _logger.debug("Processing rule: %s on line %i", rule["sid"], number)
633
634 rule_report: RuleReport = analyze_rule(
635 rule,
636 checkers=checkers,
637 )
638 rule_report.line_begin = multiline_begin_number or number
639 rule_report.line_end = number
640 output.rules.append(rule_report)
641
642 multiline_begin_number = None
643
644 _logger.info("Completed processing rule file: %s", rules)
645
646 output.summary = __summarize_output(output, checkers)
647
648 return output
649
650
651def _import_extensions() -> None:
652 global suricata_check_extensions_imported # noqa: PLW0603
653 if suricata_check_extensions_imported is True:
654 return
655
656 for module in pkgutil.iter_modules():
657 if module.name.startswith("suricata_check_"):
658 try:
659 imported_module = __import__(module.name)
660 _logger.info(
661 "Detected and successfully imported suricata-check extension %s with version %s.",
662 module.name.replace("_", "-"),
663 getattr(imported_module, "__version__"),
664 )
665 except ImportError:
666 _logger.warning(
667 "Detected potential suricata-check extension %s but failed to import it.",
668 module.name.replace("_", "-"),
669 )
670 suricata_check_extensions_imported = True
671
672
[docs]
673@lru_cache(maxsize=1)
674def get_checkers(
675 include: Sequence[str] = (".*",),
676 exclude: Sequence[str] = (),
677 issue_severity: int = logging.INFO,
678) -> Sequence[CheckerInterface]:
679 """Auto discovers all available checkers that implement the CheckerInterface.
680
681 Returns:
682 A list of available checkers that implement the CheckerInterface.
683
684 """
685 # Check for extensions and try to import them
686 _import_extensions()
687
688 checkers: list[CheckerInterface] = []
689 for checker in get_all_subclasses(CheckerInterface):
690 if checker.__name__ == DummyChecker.__name__:
691 continue
692
693 # Initialize DummyCheckers to retrieve error messages.
694 if issubclass(checker, DummyChecker):
695 checker()
696
697 enabled, relevant_codes = __get_checker_enabled(
698 checker, include, exclude, issue_severity
699 )
700
701 if enabled:
702 checkers.append(checker(include=relevant_codes))
703
704 else:
705 _logger.info("Checker %s is disabled.", checker.__name__)
706
707 _logger.info(
708 "Discovered and enabled checkers: [%s]",
709 ", ".join([c.__class__.__name__ for c in checkers]),
710 )
711 if len(checkers) == 0:
712 _logger.warning(
713 "No checkers were enabled. Check the include and exclude arguments."
714 )
715
716 # Perform a uniqueness check on the codes emmitted by the checkers
717 for checker1 in checkers:
718 for checker2 in checkers:
719 if checker1 == checker2:
720 continue
721 if not set(checker1.codes).isdisjoint(checker2.codes):
722 msg = f"Checker {checker1.__class__.__name__} and {checker2.__class__.__name__} have overlapping codes."
723 _logger.error(msg)
724
725 return sorted(checkers, key=lambda x: x.__class__.__name__)
726
727
728def __get_checker_enabled(
729 checker: type[CheckerInterface],
730 include: Sequence[str],
731 exclude: Sequence[str],
732 issue_severity: int,
733) -> tuple[bool, set[str]]:
734 enabled = checker.enabled_by_default
735
736 # If no include regexes are provided, include all by default
737 if len(include) == 0:
738 relevant_codes = set(checker.codes.keys())
739 else:
740 # If include regexes are provided, include all codes that match any of these regexes
741 relevant_codes = set()
742
743 for regex in include:
744 relevant_codes.update(
745 set(
746 filter(
747 lambda code: _regex_provider.compile("^" + regex + "$").match(
748 code
749 )
750 is not None,
751 checker.codes.keys(),
752 )
753 )
754 )
755
756 if len(relevant_codes) > 0:
757 enabled = True
758
759 # Now remove the codes that are excluded according to any of the provided exclude regexes
760 for regex in exclude:
761 relevant_codes = set(
762 filter(
763 lambda code: _regex_provider.compile("^" + regex + "$").match(code)
764 is None,
765 relevant_codes,
766 )
767 )
768
769 # Now filter out irrelevant codes based on severity
770 relevant_codes = set(
771 filter(
772 lambda code: checker.codes[code]["severity"] >= issue_severity,
773 relevant_codes,
774 )
775 )
776
777 if len(relevant_codes) == 0:
778 enabled = False
779
780 return enabled, relevant_codes
781
782
[docs]
783def analyze_rule(
784 rule: idstools.rule.Rule,
785 checkers: Optional[Sequence[CheckerInterface]] = None,
786) -> RuleReport:
787 """Checks a rule and returns a dictionary containing the rule and a list of issues found.
788
789 Args:
790 rule: The rule to be checked.
791 checkers: The checkers to be used to check the rule.
792
793 Returns:
794 A list of issues found in the rule.
795 Each issue is typed as a `dict`.
796
797 Raises:
798 InvalidRuleError: If the rule does not follow the Suricata syntax.
799
800 """
801 if not is_valid_rule(rule):
802 raise InvalidRuleError(rule["raw"])
803
804 check_rule_option_recognition(rule)
805
806 if checkers is None:
807 checkers = get_checkers()
808
809 rule_report: RuleReport = RuleReport(rule=rule)
810
811 for checker in checkers:
812 try:
813 rule_report.add_issues(checker.check_rule(rule))
814 except Exception as exception: # noqa: BLE001
815 _logger.warning(
816 "Failed to run %s on rule: %s",
817 checker.__class__.__name__,
818 rule["raw"],
819 extra={"exception": exception},
820 )
821
822 rule_report.summary = __summarize_rule(rule_report, checkers)
823
824 return rule_report
825
826
827def __summarize_rule(
828 rule: RuleReport,
829 checkers: Optional[Sequence[CheckerInterface]] = None,
830) -> RULE_SUMMARY_TYPE:
831 """Summarizes the issues found in a rule.
832
833 Args:
834 rule: The rule output dictionary to be summarized.
835 checkers: The checkers to be used to check the rule.
836
837 Returns:
838 A dictionary containing a summary of all issues found in the rule.
839
840 """
841 if checkers is None:
842 checkers = get_checkers()
843
844 summary = {}
845
846 issues: ISSUES_TYPE = rule.issues
847 summary["total_issues"] = len(issues)
848 summary["issues_by_group"] = defaultdict(int)
849 for issue in issues:
850 checker = issue.checker
851 summary["issues_by_group"][checker] += 1
852
853 # Ensure also checkers without issues are included in the report.
854 for checker in checkers:
855 if checker.__class__.__name__ not in summary["issues_by_group"]:
856 summary["issues_by_group"][checker.__class__.__name__] = 0
857
858 # Sort dictionaries for deterministic output
859 summary["issues_by_group"] = __sort_mapping(summary["issues_by_group"])
860
861 return summary
862
863
864def __summarize_output(
865 output: OutputReport,
866 checkers: Optional[Sequence[CheckerInterface]] = None,
867) -> OutputSummary:
868 """Summarizes the issues found in a rules file.
869
870 Args:
871 output: The unsammarized output of the rules file containing all rules and their issues.
872 checkers: The checkers to be used to check the rule.
873
874 Returns:
875 A dictionary containing a summary of all issues found in the rules file.
876
877 """
878 if checkers is None:
879 checkers = get_checkers()
880
881 return OutputSummary(
882 overall_summary=__get_overall_summary(output),
883 issues_by_group=__get_issues_by_group(output, checkers),
884 issues_by_type=__get_issues_by_type(output, checkers),
885 )
886
887
888def __get_overall_summary(
889 output: OutputReport,
890) -> SIMPLE_SUMMARY_TYPE:
891 overall_summary = {
892 "Total Issues": 0,
893 "Rules with Issues": 0,
894 "Rules without Issues": 0,
895 }
896
897 rules: RULE_REPORTS_TYPE = output.rules
898 for rule in rules:
899 issues: ISSUES_TYPE = rule.issues
900 overall_summary["Total Issues"] += len(issues)
901
902 if len(issues) == 0:
903 overall_summary["Rules without Issues"] += 1
904 else:
905 overall_summary["Rules with Issues"] += 1
906
907 return overall_summary
908
909
910def __get_issues_by_group(
911 output: OutputReport,
912 checkers: Optional[Sequence[CheckerInterface]] = None,
913) -> SIMPLE_SUMMARY_TYPE:
914 if checkers is None:
915 checkers = get_checkers()
916
917 issues_by_group = defaultdict(int)
918
919 # Ensure also checkers and codes without issues are included in the report.
920 for checker in checkers:
921 issues_by_group[checker.__class__.__name__] = 0
922
923 rules: RULE_REPORTS_TYPE = output.rules
924 for rule in rules:
925 issues: ISSUES_TYPE = rule.issues
926
927 for issue in issues:
928 checker = issue.checker
929 if checker is not None:
930 issues_by_group[checker] += 1
931
932 return __sort_mapping(issues_by_group)
933
934
935def __get_issues_by_type(
936 output: OutputReport,
937 checkers: Optional[Sequence[CheckerInterface]] = None,
938) -> EXTENSIVE_SUMMARY_TYPE:
939 if checkers is None:
940 checkers = get_checkers()
941 issues_by_type: EXTENSIVE_SUMMARY_TYPE = defaultdict(lambda: defaultdict(int))
942
943 # Ensure also checkers and codes without issues are included in the report.
944 for checker in checkers:
945 for code in checker.codes:
946 issues_by_type[checker.__class__.__name__][code] = 0
947
948 rules: RULE_REPORTS_TYPE = output.rules
949 for rule in rules:
950 issues: ISSUES_TYPE = rule.issues
951
952 checker_codes = defaultdict(lambda: defaultdict(int))
953 for issue in issues:
954 checker = issue.checker
955 if checker is not None:
956 code = issue.code
957 checker_codes[checker][code] += 1
958
959 for checker, codes in checker_codes.items():
960 for code, count in codes.items():
961 issues_by_type[checker][code] += count
962
963 for key in issues_by_type:
964 issues_by_type[key] = __sort_mapping(issues_by_type[key])
965
966 return __sort_mapping(issues_by_type)
967
968
969def __sort_mapping(mapping: Mapping) -> dict:
970 return {key: mapping[key] for key in sorted(mapping.keys())}
971
972
973if __name__ == "__main__":
974 main()