1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
2
3import atexit
4import configparser
5import io
6import json
7import logging
8import logging.handlers
9import multiprocessing
10import os
11import pkgutil
12import sys
13from collections import defaultdict
14from collections.abc import Mapping, Sequence
15from functools import lru_cache
16from typing import (
17 Any,
18 Literal,
19 Optional,
20 TypeVar,
21 Union,
22 overload,
23)
24
25import click
26import idstools.rule
27import tabulate
28
29# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
30_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
31if sys.path[0] != _suricata_check_path:
32 sys.path.insert(0, _suricata_check_path)
33
34from suricata_check import __version__, get_dependency_versions # noqa: E402
35from suricata_check.checkers.interface import CheckerInterface # noqa: E402
36from suricata_check.checkers.interface.dummy import DummyChecker # noqa: E402
37from suricata_check.utils._click import ClickHandler, ClickHelpOption # noqa: E402
38from suricata_check.utils._path import find_rules_file # noqa: E402
39from suricata_check.utils.checker import ( # noqa: E402
40 check_rule_option_recognition,
41 get_rule_suboption,
42)
43from suricata_check.utils.checker_typing import ( # noqa: E402
44 EXTENSIVE_SUMMARY_TYPE,
45 ISSUES_TYPE,
46 RULE_REPORTS_TYPE,
47 RULE_SUMMARY_TYPE,
48 SIMPLE_SUMMARY_TYPE,
49 InvalidRuleError,
50 OutputReport,
51 OutputSummary,
52 RuleReport,
53 get_all_subclasses,
54)
55from suricata_check.utils.regex import get_regex_provider, is_valid_rule # noqa: E402
56
57LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
58LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
59GITLAB_SEVERITIES = {
60 logging.DEBUG: "info",
61 logging.INFO: "info",
62 logging.WARNING: "minor",
63 logging.ERROR: "major",
64 logging.CRITICAL: "critical",
65}
66GITHUB_SEVERITIES = {
67 logging.DEBUG: "debug",
68 logging.INFO: "notice",
69 logging.WARNING: "warning",
70 logging.ERROR: "error",
71 logging.CRITICAL: "error",
72}
73GITHUB_COMMAND = (
74 "::{level} file={file},line={line},endLine={end_line},title={title}::{message}"
75)
76
77_logger = logging.getLogger(__name__)
78
79_regex_provider = get_regex_provider()
80
81# Global variable to check if extensions have already been imported in case get_checkers() is called multiple times.
82suricata_check_extensions_imported = False
83
84
85@click.command()
86@click.option(
87 "--ini",
88 "-i",
89 help="Path to suricata-check.ini file to read configuration from.",
90 show_default=True,
91)
92@click.option(
93 "--rules",
94 "-r",
95 help="Path to Suricata rules to provide check on.",
96 show_default=True,
97)
98@click.option(
99 "--single-rule",
100 "-s",
101 help="A single Suricata rule to be checked",
102 show_default=False,
103)
104@click.option(
105 "--out",
106 "-o",
107 help="Path to suricata-check output folder.",
108 show_default=True,
109)
110@click.option(
111 "--log-level",
112 help=f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
113 show_default=True,
114)
115@click.option(
116 "--gitlab",
117 help="Flag to create CodeClimate output report for GitLab CI/CD.",
118 show_default=True,
119 is_flag=True,
120)
121@click.option(
122 "--github",
123 help="Flag to write workflow commands to stdout for GitHub CI/CD.",
124 show_default=True,
125 is_flag=True,
126)
127@click.option(
128 "--evaluate-disabled",
129 help="Flag to evaluate disabled rules.",
130 show_default=True,
131 is_flag=True,
132)
133@click.option(
134 "--issue-severity",
135 help=f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
136 show_default=True,
137)
138@click.option(
139 "--include-all",
140 "-a",
141 help="Flag to indicate all checker codes should be enabled.",
142 show_default=True,
143 is_flag=True,
144)
145@click.option(
146 "--include",
147 "-i",
148 help="List of all checker codes to enable.",
149 show_default=True,
150 multiple=True,
151)
152@click.option(
153 "--exclude",
154 "-e",
155 help="List of all checker codes to disable.",
156 show_default=True,
157 multiple=True,
158)
159@click.help_option("-h", "--help", cls=ClickHelpOption)
160def main( # noqa: PLR0915
161 **kwargs: dict[str, Any],
162) -> None:
163 """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
164
165 Raises:
166 BadParameter: If provided arguments are invalid.
167
168 RuntimeError: If no checkers could be automatically discovered.
169
170 """
171 # Look for a ini file and parse it.
172 ini_kwargs = __get_ini_kwargs(
173 str(kwargs["ini"]) if kwargs["ini"] is not None else None # type: ignore reportUnnecessaryComparison
174 )
175
176 # Verify CLI argument types and get CLI arguments or use default arguments
177 rules: str = __get_verified_kwarg([kwargs, ini_kwargs], "rules", str, False, ".")
178 single_rule: Optional[str] = __get_verified_kwarg(
179 [kwargs, ini_kwargs], "single_rule", str, True, None
180 )
181 out: str = __get_verified_kwarg([kwargs, ini_kwargs], "out", str, False, ".")
182 log_level: LogLevel = __get_verified_kwarg(
183 [kwargs, ini_kwargs], "log_level", str, False, "DEBUG"
184 )
185 gitlab: bool = __get_verified_kwarg(
186 [kwargs, ini_kwargs], "gitlab", bool, False, False
187 )
188 github: bool = __get_verified_kwarg(
189 [kwargs, ini_kwargs], "github", bool, False, False
190 )
191 evaluate_disabled: bool = __get_verified_kwarg(
192 [kwargs, ini_kwargs], "evaluate_disabled", bool, False, False
193 )
194 issue_severity: LogLevel = __get_verified_kwarg(
195 [kwargs, ini_kwargs], "issue_severity", str, False, "INFO"
196 )
197 include_all: bool = __get_verified_kwarg(
198 [kwargs, ini_kwargs], "include_all", bool, False, False
199 )
200 include: tuple[str, ...] = __get_verified_kwarg(
201 [kwargs, ini_kwargs], "include", tuple, False, ()
202 )
203 exclude: tuple[str, ...] = __get_verified_kwarg(
204 [kwargs, ini_kwargs], "exclude", tuple, False, ()
205 )
206
207 # Verify that out argument is valid
208 if os.path.exists(out) and not os.path.isdir(out):
209 raise click.BadParameter(f"Error: {out} is not a directory.")
210
211 # Verify that log_level argument is valid
212 if log_level not in LOG_LEVELS:
213 raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
214
215 # Create out directory if non-existent
216 if not os.path.exists(out):
217 os.makedirs(out)
218
219 # Setup logging from a seperate thread
220 queue = multiprocessing.Manager().Queue()
221 queue_handler = logging.handlers.QueueHandler(queue)
222
223 click_handler = ClickHandler(
224 github=github, github_level=getattr(logging, log_level)
225 )
226 logging.basicConfig(
227 level=log_level,
228 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
229 handlers=(queue_handler, click_handler),
230 force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", False) == "TRUE",
231 )
232
233 file_handler = logging.FileHandler(
234 filename=os.path.join(out, "suricata-check.log"),
235 delay=True,
236 )
237 queue_listener = logging.handlers.QueueListener(
238 queue,
239 file_handler,
240 respect_handler_level=True,
241 )
242
243 def _at_exit() -> None:
244 """Cleans up logging listener and handlers before exiting."""
245 queue_listener.enqueue_sentinel()
246 queue_listener.stop()
247 file_handler.flush()
248 file_handler.close()
249 atexit.unregister(_at_exit)
250
251 atexit.register(_at_exit)
252
253 queue_listener.start()
254
255 # Log the arguments:
256 _logger.info("Running suricata-check with the following arguments:")
257 _logger.info("out: %s", out)
258 _logger.info("rules: %s", rules)
259 _logger.info("single_rule: %s", single_rule)
260 _logger.info("log_level: %s", log_level)
261 _logger.info("gitlab: %s", gitlab)
262 _logger.info("github: %s", github)
263 _logger.info("evaluate_disabled: %s", evaluate_disabled)
264 _logger.info("issue_severity: %s", issue_severity)
265 _logger.info("include_all: %s", include_all)
266 _logger.info("include: %s", include)
267 _logger.info("exclude: %s", exclude)
268
269 # Log the environment:
270 _logger.debug("Platform: %s", sys.platform)
271 _logger.debug("Python version: %s", sys.version)
272 _logger.debug("suricata-check path: %s", _suricata_check_path)
273 _logger.debug("suricata-check version: %s", __version__)
274 for package, version in get_dependency_versions().items():
275 _logger.debug("Dependency %s version: %s", package, version)
276
277 # Verify that include and exclude arguments are valid
278 if include_all and len(include) > 0:
279 raise click.BadParameter(
280 "Error: Cannot use --include-all and --include together."
281 )
282 if include_all:
283 include = (".*",)
284
285 # Verify that issue_severity argument is valid
286 if issue_severity not in LOG_LEVELS:
287 raise click.BadParameter(
288 f"Error: {issue_severity} is not a valid issue severity or log level."
289 )
290
291 checkers = get_checkers(
292 include, exclude, issue_severity=getattr(logging, issue_severity)
293 )
294
295 if single_rule is not None:
296 __main_single_rule(out, single_rule, checkers)
297
298 # Return here so no rules file is processed.
299 _at_exit()
300 return
301
302 # Check if the rules argument is valid and find the rules file
303 rules = find_rules_file(rules)
304
305 output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
306
307 __write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
308
309 _at_exit()
310
311
312def __get_ini_kwargs(path: Optional[str]) -> dict[str, Any]: # noqa: C901, PLR0912
313 ini_kwargs: dict[str, Any] = {}
314 if path is not None:
315 if not os.path.exists(path):
316 raise click.BadParameter(
317 f"Error: INI file provided in {path} but no options loaded"
318 )
319
320 # Use the default path if no path was provided
321 if path is None:
322 path = "suricata-check.ini"
323 if not os.path.exists(path):
324 return {}
325
326 config_parser = configparser.ConfigParser(
327 empty_lines_in_values=False,
328 default_section="suricata-check",
329 converters={"tuple": lambda x: tuple(json.loads(x))},
330 )
331 config_parser.read(path)
332 ini_kwargs = {}
333
334 if config_parser.has_option("suricata-check", "rules"):
335 ini_kwargs["rules"] = config_parser.get("suricata-check", "rules")
336 if config_parser.has_option("suricata-check", "out"):
337 ini_kwargs["out"] = config_parser.get("suricata-check", "out")
338 if config_parser.has_option("suricata-check", "log"):
339 ini_kwargs["log"] = config_parser.get("suricata-check", "log")
340 if config_parser.has_option("suricata-check", "gitlab"):
341 ini_kwargs["gitlab"] = config_parser.getboolean("suricata-check", "gitlab")
342 if config_parser.has_option("suricata-check", "github"):
343 ini_kwargs["github"] = config_parser.getboolean("suricata-check", "github")
344 if config_parser.has_option("suricata-check", "evaluate_disabled"):
345 ini_kwargs["evaluate_disabled"] = config_parser.getboolean(
346 "suricata-check", "evaluate_disabled"
347 )
348 if config_parser.has_option("suricata-check", "issue-severity"):
349 ini_kwargs["issue_severity"] = config_parser.get(
350 "suricata-check", "issue-severity"
351 )
352 if config_parser.has_option("suricata-check", "include-all"):
353 ini_kwargs["include_all"] = config_parser.getboolean(
354 "suricata-check", "include-all"
355 )
356 if config_parser.has_option("suricata-check", "include"):
357 ini_kwargs["include"] = config_parser.gettuple("suricata-check", "include") # type: ignore reportAttributeAccessIssue
358 if config_parser.has_option("suricata-check", "exclude"):
359 ini_kwargs["exclude"] = config_parser.gettuple("suricata-check", "exclude") # type: ignore reportAttributeAccessIssue
360
361 return ini_kwargs
362
363
364D = TypeVar("D")
365
366
367@overload
368def __get_verified_kwarg(
369 kwargss: Sequence[dict[str, Any]],
370 name: str,
371 expected_type: type,
372 optional: Literal[True],
373 default: D,
374) -> Optional[D]:
375 pass
376
377
378@overload
379def __get_verified_kwarg(
380 kwargss: Sequence[dict[str, Any]],
381 name: str,
382 expected_type: type,
383 optional: Literal[False],
384 default: D,
385) -> D:
386 pass
387
388
389def __get_verified_kwarg(
390 kwargss: Sequence[dict[str, Any]],
391 name: str,
392 expected_type: type,
393 optional: bool,
394 default: D,
395) -> Optional[D]:
396 for kwargs in kwargss:
397 if name in kwargs:
398 if kwargs[name] is None:
399 if optional and default is not None:
400 return None
401 return default
402
403 if kwargs[name] is not default:
404 if not isinstance(kwargs[name], expected_type):
405 raise click.BadParameter(
406 f"""Error: \
407 Argument `{name}` should have a value of type `{expected_type}` \
408 but has value {kwargs[name]} of type {kwargs[name].__class__} instead."""
409 )
410 return kwargs[name]
411
412 return default
413
414
415def __main_single_rule(
416 out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]]
417) -> None:
418 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(single_rule)
419
420 # Verify that a rule was parsed correctly.
421 if rule is None:
422 msg = f"Error parsing rule from user input: {single_rule}"
423 _logger.critical(msg)
424 raise click.BadParameter(f"Error: {msg}")
425
426 if not is_valid_rule(rule):
427 msg = f"Error parsing rule from user input: {single_rule}"
428 _logger.critical(msg)
429 raise click.BadParameter(f"Error: {msg}")
430
431 _logger.debug("Processing rule: %s", rule["sid"])
432
433 rule_report = analyze_rule(rule, checkers=checkers)
434
435 __write_output(OutputReport(rules=[rule_report]), out)
436
437
438def __write_output(
439 output: OutputReport,
440 out: str,
441 gitlab: bool = False,
442 github: bool = False,
443 rules_file: Optional[str] = None,
444) -> None:
445 _logger.info(
446 "Writing output to suricata-check.jsonl and suricata-check-fast.log in %s",
447 os.path.abspath(out),
448 )
449 with (
450 open(
451 os.path.join(out, "suricata-check.jsonl"),
452 "w",
453 buffering=io.DEFAULT_BUFFER_SIZE,
454 ) as jsonl_fh,
455 open(
456 os.path.join(out, "suricata-check-fast.log"),
457 "w",
458 buffering=io.DEFAULT_BUFFER_SIZE,
459 ) as fast_fh,
460 ):
461 rules: RULE_REPORTS_TYPE = output.rules
462 jsonl_fh.write("\n".join([str(rule) for rule in rules]))
463
464 for rule_report in rules:
465 rule: idstools.rule.Rule = rule_report.rule
466 lines: str = (
467 "{}-{}".format(rule_report.line_begin, rule_report.line_end)
468 if rule_report.line_begin
469 else "Unknown"
470 )
471 issues: ISSUES_TYPE = rule_report.issues
472 for issue in issues:
473 code = issue.code
474 severity = (
475 logging.getLevelName(issue.severity) if issue.severity else None
476 )
477 issue_msg = issue.message.replace("\n", " ")
478
479 msg = "[{}]{} Lines {}, sid {}: {}".format(
480 code,
481 f" ({severity})" if severity else "",
482 lines,
483 rule["sid"],
484 issue_msg,
485 )
486 fast_fh.write(msg + "\n")
487 click.secho(msg, color=True, fg="blue")
488
489 if output.summary is not None:
490 __write_output_stats(output, out)
491
492 if gitlab:
493 assert rules_file is not None
494
495 __write_output_gitlab(output, out, rules_file)
496
497 if github:
498 assert rules_file is not None
499
500 __write_output_github(output, rules_file)
501
502
503def __write_output_stats(output: OutputReport, out: str) -> None:
504 assert output.summary is not None
505
506 with open(
507 os.path.join(out, "suricata-check-stats.log"),
508 "w",
509 buffering=io.DEFAULT_BUFFER_SIZE,
510 ) as stats_fh:
511 summary: OutputSummary = output.summary
512
513 overall_summary: SIMPLE_SUMMARY_TYPE = summary.overall_summary
514
515 n_issues = overall_summary["Total Issues"]
516 n_rules = (
517 overall_summary["Rules with Issues"]
518 + overall_summary["Rules without Issues"]
519 )
520
521 stats_fh.write(
522 tabulate.tabulate(
523 (
524 (
525 k,
526 v,
527 (
528 "{:.0%}".format(v / n_rules)
529 if k.startswith("Rules ") and n_rules > 0
530 else "-"
531 ),
532 )
533 for k, v in overall_summary.items()
534 ),
535 headers=(
536 "Count",
537 "Percentage of Rules",
538 ),
539 )
540 + "\n\n",
541 )
542
543 click.secho(
544 f"Total issues found: {overall_summary['Total Issues']}",
545 color=True,
546 bold=True,
547 fg="blue",
548 )
549 click.secho(
550 f"Rules with Issues found: {overall_summary['Rules with Issues']}",
551 color=True,
552 bold=True,
553 fg="blue",
554 )
555
556 issues_by_group: SIMPLE_SUMMARY_TYPE = summary.issues_by_group
557
558 stats_fh.write(
559 tabulate.tabulate(
560 (
561 (k, v, "{:.0%}".format(v / n_issues) if n_issues > 0 else "-")
562 for k, v in issues_by_group.items()
563 ),
564 headers=(
565 "Count",
566 "Percentage of Total Issues",
567 ),
568 )
569 + "\n\n",
570 )
571
572 issues_by_type: EXTENSIVE_SUMMARY_TYPE = summary.issues_by_type
573 for checker, checker_issues_by_type in issues_by_type.items():
574 stats_fh.write(" " + checker + " " + "\n")
575 stats_fh.write("-" * (len(checker) + 2) + "\n")
576 stats_fh.write(
577 tabulate.tabulate(
578 (
579 (
580 k,
581 v,
582 "{:.0%}".format(v / n_rules) if n_rules > 0 else "-",
583 )
584 for k, v in checker_issues_by_type.items()
585 ),
586 headers=(
587 "Count",
588 "Percentage of Rules",
589 ),
590 )
591 + "\n\n",
592 )
593
594
595def __write_output_gitlab(output: OutputReport, out: str, rules_file: str) -> None:
596 with open(
597 os.path.join(out, "suricata-check-gitlab.json"),
598 "w",
599 buffering=io.DEFAULT_BUFFER_SIZE,
600 ) as gitlab_fh:
601 issue_dicts = []
602 for rule_report in output.rules:
603 line_begin: Optional[int] = rule_report.line_begin
604 assert line_begin is not None
605 line_end: Optional[int] = rule_report.line_end
606 assert line_end is not None
607 issues: ISSUES_TYPE = rule_report.issues
608 for issue in issues:
609 code = issue.code
610 issue_msg = issue.message.replace("\n", " ")
611 assert issue.checker is not None
612 issue_checker = issue.checker
613 issue_hash = str(issue.hash)
614 assert issue.severity is not None
615 issue_severity = GITLAB_SEVERITIES[issue.severity]
616
617 issue_dict: Mapping[
618 str,
619 Union[str, list[str], Mapping[str, Union[str, Mapping[str, int]]]],
620 ] = {
621 "description": issue_msg,
622 "categories": [issue_checker],
623 "check_name": f"Suricata Check {code}",
624 "fingerprint": issue_hash,
625 "severity": issue_severity,
626 "location": {
627 "path": rules_file,
628 "lines": {"begin": line_begin, "end": line_end},
629 },
630 }
631 issue_dicts.append(issue_dict)
632
633 gitlab_fh.write(json.dumps(issue_dicts))
634
635
636def __write_output_github(output: OutputReport, rules_file: str) -> None:
637 output_lines: dict[str, list[str]] = {
638 k: [] for k in set(GITHUB_SEVERITIES.values())
639 }
640 for rule_report in output.rules:
641 line_begin: Optional[int] = rule_report.line_begin
642 assert line_begin is not None
643 line_end: Optional[int] = rule_report.line_end
644 assert line_end is not None
645 issues: ISSUES_TYPE = rule_report.issues
646 for issue in issues:
647 code = issue.code
648 issue_msg = issue.message.replace("\n", " ")
649 assert issue.checker is not None
650 issue_checker = issue.checker
651 assert issue.severity is not None
652 issue_severity = GITHUB_SEVERITIES[issue.severity]
653 title = f"{issue_checker} - {code}"
654
655 output_lines[issue_severity].append(
656 GITHUB_COMMAND.format(
657 level=issue_severity,
658 file=rules_file,
659 line=line_begin,
660 end_line=line_end,
661 title=title,
662 message=issue_msg,
663 )
664 )
665
666 for message_level, lines in output_lines.items():
667 if len(lines) > 0:
668 print(f"::group::{message_level}") # noqa: T201
669 for message in lines:
670 print(message) # noqa: T201
671 print("::endgroup::") # noqa: T201
672
673
[docs]
674def process_rules_file( # noqa: C901, PLR0912, PLR0915
675 rules: str,
676 evaluate_disabled: bool,
677 checkers: Optional[Sequence[CheckerInterface]] = None,
678) -> OutputReport:
679 """Processes a rule file and returns a list of rules and their issues.
680
681 Args:
682 rules: A path to a Suricata rules file.
683 evaluate_disabled: A flag indicating whether disabled rules should be evaluated.
684 checkers: The checkers to be used when processing the rule file.
685
686 Returns:
687 A list of rules and their issues.
688
689 Raises:
690 RuntimeError: If no checkers could be automatically discovered.
691
692 """
693 if checkers is None:
694 checkers = get_checkers()
695
696 output = OutputReport()
697
698 with (
699 open(
700 os.path.normpath(rules),
701 buffering=io.DEFAULT_BUFFER_SIZE,
702 ) as rules_fh,
703 ):
704 if len(checkers) == 0:
705 msg = "No checkers provided for processing rules."
706 _logger.error(msg)
707 raise RuntimeError(msg)
708
709 _logger.info("Processing rule file: %s", rules)
710
711 collected_multiline_parts: Optional[str] = None
712 multiline_begin_number: Optional[int] = None
713
714 for number, line in enumerate(rules_fh.readlines(), start=1):
715 # First work on collecting and parsing multiline rules
716 if line.rstrip("\r\n").endswith("\\"):
717 multiline_part = line.rstrip("\r\n")[:-1]
718
719 if collected_multiline_parts is None:
720 collected_multiline_parts = multiline_part
721 multiline_begin_number = number
722 else:
723 collected_multiline_parts += multiline_part.lstrip()
724
725 continue
726
727 # Process final part of multiline rule if one is being collected
728 if collected_multiline_parts is not None:
729 collected_multiline_parts += line.lstrip()
730
731 rule_line = collected_multiline_parts.strip()
732
733 collected_multiline_parts = None
734 # If no multiline rule is being collected process as a potential single line rule
735 else:
736 if len(line.strip()) == 0:
737 continue
738
739 if line.strip().startswith("#"):
740 if evaluate_disabled:
741 # Verify that this line is a rule and not a comment
742 if idstools.rule.parse(line) is None:
743 # Log the comment since it may be a invalid rule
744 _logger.warning(
745 "Ignoring comment on line %i: %s", number, line
746 )
747 continue
748 else:
749 # Skip the rule
750 continue
751
752 rule_line = line.strip()
753
754 try:
755 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(rule_line)
756 except Exception: # noqa: BLE001
757 _logger.error(
758 "Internal error in idstools parsing rule on line %i: %s",
759 number,
760 rule_line,
761 )
762 rule = None
763
764 # Parse comment and potential ignore comment to ignore rules
765 ignore = __parse_type_ignore(rule)
766
767 # Verify that a rule was parsed correctly.
768 if rule is None:
769 _logger.error("Error parsing rule on line %i: %s", number, rule_line)
770 continue
771
772 if not is_valid_rule(rule):
773 _logger.error("Invalid rule on line %i: %s", number, rule_line)
774 continue
775
776 _logger.debug("Processing rule: %s on line %i", rule["sid"], number)
777
778 rule_report: RuleReport = analyze_rule(
779 rule,
780 checkers=checkers,
781 ignore=ignore,
782 )
783 rule_report.line_begin = multiline_begin_number or number
784 rule_report.line_end = number
785
786 output.rules.append(rule_report)
787
788 multiline_begin_number = None
789
790 _logger.info("Completed processing rule file: %s", rules)
791
792 output.summary = __summarize_output(output, checkers)
793
794 return output
795
796
797def __is_valid_idstools_rule(text: str) -> bool:
798 try:
799 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(text)
800 except Exception: # noqa: BLE001
801 return False
802
803 if rule is None:
804 return False
805
806 return True
807
808
809def __parse_type_ignore(rule: Optional[idstools.rule.Rule]) -> Optional[Sequence[str]]:
810 if rule is None:
811 return None
812
813 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check")
814 if ignore_value is None:
815 return []
816
817 return ignore_value.strip(' "').split(",")
818
819
820def _import_extensions() -> None:
821 global suricata_check_extensions_imported # noqa: PLW0603
822 if suricata_check_extensions_imported is True:
823 return
824
825 for module in pkgutil.iter_modules():
826 if module.name.startswith("suricata_check_"):
827 try:
828 imported_module = __import__(module.name)
829 _logger.info(
830 "Detected and successfully imported suricata-check extension %s with version %s.",
831 module.name.replace("_", "-"),
832 getattr(imported_module, "__version__"),
833 )
834 except ImportError:
835 _logger.warning(
836 "Detected potential suricata-check extension %s but failed to import it.",
837 module.name.replace("_", "-"),
838 )
839 suricata_check_extensions_imported = True
840
841
[docs]
842@lru_cache(maxsize=1)
843def get_checkers(
844 include: Sequence[str] = (".*",),
845 exclude: Sequence[str] = (),
846 issue_severity: int = logging.INFO,
847) -> Sequence[CheckerInterface]:
848 """Auto discovers all available checkers that implement the CheckerInterface.
849
850 Returns:
851 A list of available checkers that implement the CheckerInterface.
852
853 """
854 # Check for extensions and try to import them
855 _import_extensions()
856
857 checkers: list[CheckerInterface] = []
858 for checker in get_all_subclasses(CheckerInterface):
859 if checker.__name__ == DummyChecker.__name__:
860 continue
861
862 # Initialize DummyCheckers to retrieve error messages.
863 if issubclass(checker, DummyChecker):
864 checker()
865
866 enabled, relevant_codes = __get_checker_enabled(
867 checker, include, exclude, issue_severity
868 )
869
870 if enabled:
871 checkers.append(checker(include=relevant_codes))
872
873 else:
874 _logger.info("Checker %s is disabled.", checker.__name__)
875
876 _logger.info(
877 "Discovered and enabled checkers: [%s]",
878 ", ".join([c.__class__.__name__ for c in checkers]),
879 )
880 if len(checkers) == 0:
881 _logger.warning(
882 "No checkers were enabled. Check the include and exclude arguments."
883 )
884
885 # Perform a uniqueness check on the codes emmitted by the checkers
886 for checker1 in checkers:
887 for checker2 in checkers:
888 if checker1 == checker2:
889 continue
890 if not set(checker1.codes).isdisjoint(checker2.codes):
891 msg = f"Checker {checker1.__class__.__name__} and {checker2.__class__.__name__} have overlapping codes."
892 _logger.error(msg)
893
894 return sorted(checkers, key=lambda x: x.__class__.__name__)
895
896
897def __get_checker_enabled(
898 checker: type[CheckerInterface],
899 include: Sequence[str],
900 exclude: Sequence[str],
901 issue_severity: int,
902) -> tuple[bool, set[str]]:
903 enabled = checker.enabled_by_default
904
905 # If no include regexes are provided, include all by default
906 if len(include) == 0:
907 relevant_codes = set(checker.codes.keys())
908 else:
909 # If include regexes are provided, include all codes that match any of these regexes
910 relevant_codes = set()
911
912 for regex in include:
913 relevant_codes.update(
914 set(
915 filter(
916 lambda code: _regex_provider.compile("^" + regex + "$").match(
917 code
918 )
919 is not None,
920 checker.codes.keys(),
921 )
922 )
923 )
924
925 if len(relevant_codes) > 0:
926 enabled = True
927
928 # Now remove the codes that are excluded according to any of the provided exclude regexes
929 for regex in exclude:
930 relevant_codes = set(
931 filter(
932 lambda code: _regex_provider.compile("^" + regex + "$").match(code)
933 is None,
934 relevant_codes,
935 )
936 )
937
938 # Now filter out irrelevant codes based on severity
939 relevant_codes = set(
940 filter(
941 lambda code: checker.codes[code]["severity"] >= issue_severity,
942 relevant_codes,
943 )
944 )
945
946 if len(relevant_codes) == 0:
947 enabled = False
948
949 return enabled, relevant_codes
950
951
[docs]
952def analyze_rule(
953 rule: idstools.rule.Rule,
954 checkers: Optional[Sequence[CheckerInterface]] = None,
955 ignore: Optional[Sequence[str]] = None,
956) -> RuleReport:
957 """Checks a rule and returns a dictionary containing the rule and a list of issues found.
958
959 Args:
960 rule: The rule to be checked.
961 checkers: The checkers to be used to check the rule.
962 ignore: Regular expressions to match checker codes to ignore
963
964 Returns:
965 A list of issues found in the rule.
966 Each issue is typed as a `dict`.
967
968 Raises:
969 InvalidRuleError: If the rule does not follow the Suricata syntax.
970
971 """
972 if not is_valid_rule(rule):
973 raise InvalidRuleError(rule["raw"])
974
975 check_rule_option_recognition(rule)
976
977 if checkers is None:
978 checkers = get_checkers()
979
980 rule_report: RuleReport = RuleReport(rule=rule)
981
982 _logger.warning(ignore)
983
984 compiled_ignore = (
985 [_regex_provider.compile(r) for r in ignore] if ignore is not None else []
986 )
987
988 for checker in checkers:
989 try:
990 issues = checker.check_rule(rule)
991 for r in compiled_ignore:
992 issues = list(filter(lambda issue: r.match(issue.code) is None, issues))
993 rule_report.add_issues(issues)
994 except Exception as exception: # noqa: BLE001
995 _logger.warning(
996 "Failed to run %s on rule: %s",
997 checker.__class__.__name__,
998 rule["raw"],
999 extra={"exception": exception},
1000 )
1001
1002 rule_report.summary = __summarize_rule(rule_report, checkers)
1003
1004 return rule_report
1005
1006
1007def __summarize_rule(
1008 rule: RuleReport,
1009 checkers: Optional[Sequence[CheckerInterface]] = None,
1010) -> RULE_SUMMARY_TYPE:
1011 """Summarizes the issues found in a rule.
1012
1013 Args:
1014 rule: The rule output dictionary to be summarized.
1015 checkers: The checkers to be used to check the rule.
1016
1017 Returns:
1018 A dictionary containing a summary of all issues found in the rule.
1019
1020 """
1021 if checkers is None:
1022 checkers = get_checkers()
1023
1024 summary = {}
1025
1026 issues: ISSUES_TYPE = rule.issues
1027 summary["total_issues"] = len(issues)
1028 summary["issues_by_group"] = defaultdict(int)
1029 for issue in issues:
1030 checker = issue.checker
1031 summary["issues_by_group"][checker] += 1
1032
1033 # Ensure also checkers without issues are included in the report.
1034 for checker in checkers:
1035 if checker.__class__.__name__ not in summary["issues_by_group"]:
1036 summary["issues_by_group"][checker.__class__.__name__] = 0
1037
1038 # Sort dictionaries for deterministic output
1039 summary["issues_by_group"] = __sort_mapping(summary["issues_by_group"])
1040
1041 return summary
1042
1043
1044def __summarize_output(
1045 output: OutputReport,
1046 checkers: Optional[Sequence[CheckerInterface]] = None,
1047) -> OutputSummary:
1048 """Summarizes the issues found in a rules file.
1049
1050 Args:
1051 output: The unsammarized output of the rules file containing all rules and their issues.
1052 checkers: The checkers to be used to check the rule.
1053
1054 Returns:
1055 A dictionary containing a summary of all issues found in the rules file.
1056
1057 """
1058 if checkers is None:
1059 checkers = get_checkers()
1060
1061 return OutputSummary(
1062 overall_summary=__get_overall_summary(output),
1063 issues_by_group=__get_issues_by_group(output, checkers),
1064 issues_by_type=__get_issues_by_type(output, checkers),
1065 )
1066
1067
1068def __get_overall_summary(
1069 output: OutputReport,
1070) -> SIMPLE_SUMMARY_TYPE:
1071 overall_summary = {
1072 "Total Issues": 0,
1073 "Rules with Issues": 0,
1074 "Rules without Issues": 0,
1075 }
1076
1077 rules: RULE_REPORTS_TYPE = output.rules
1078 for rule in rules:
1079 issues: ISSUES_TYPE = rule.issues
1080 overall_summary["Total Issues"] += len(issues)
1081
1082 if len(issues) == 0:
1083 overall_summary["Rules without Issues"] += 1
1084 else:
1085 overall_summary["Rules with Issues"] += 1
1086
1087 return overall_summary
1088
1089
1090def __get_issues_by_group(
1091 output: OutputReport,
1092 checkers: Optional[Sequence[CheckerInterface]] = None,
1093) -> SIMPLE_SUMMARY_TYPE:
1094 if checkers is None:
1095 checkers = get_checkers()
1096
1097 issues_by_group = defaultdict(int)
1098
1099 # Ensure also checkers and codes without issues are included in the report.
1100 for checker in checkers:
1101 issues_by_group[checker.__class__.__name__] = 0
1102
1103 rules: RULE_REPORTS_TYPE = output.rules
1104 for rule in rules:
1105 issues: ISSUES_TYPE = rule.issues
1106
1107 for issue in issues:
1108 checker = issue.checker
1109 if checker is not None:
1110 issues_by_group[checker] += 1
1111
1112 return __sort_mapping(issues_by_group)
1113
1114
1115def __get_issues_by_type(
1116 output: OutputReport,
1117 checkers: Optional[Sequence[CheckerInterface]] = None,
1118) -> EXTENSIVE_SUMMARY_TYPE:
1119 if checkers is None:
1120 checkers = get_checkers()
1121 issues_by_type: EXTENSIVE_SUMMARY_TYPE = defaultdict(lambda: defaultdict(int))
1122
1123 # Ensure also checkers and codes without issues are included in the report.
1124 for checker in checkers:
1125 for code in checker.codes:
1126 issues_by_type[checker.__class__.__name__][code] = 0
1127
1128 rules: RULE_REPORTS_TYPE = output.rules
1129 for rule in rules:
1130 issues: ISSUES_TYPE = rule.issues
1131
1132 checker_codes = defaultdict(lambda: defaultdict(int))
1133 for issue in issues:
1134 checker = issue.checker
1135 if checker is not None:
1136 code = issue.code
1137 checker_codes[checker][code] += 1
1138
1139 for checker, codes in checker_codes.items():
1140 for code, count in codes.items():
1141 issues_by_type[checker][code] += count
1142
1143 for key in issues_by_type:
1144 issues_by_type[key] = __sort_mapping(issues_by_type[key])
1145
1146 return __sort_mapping(issues_by_type)
1147
1148
1149def __sort_mapping(mapping: Mapping) -> dict:
1150 return {key: mapping[key] for key in sorted(mapping.keys())}
1151
1152
1153if __name__ == "__main__":
1154 main()