1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
2
3import atexit
4import configparser
5import io
6import json
7import logging
8import logging.handlers
9import multiprocessing
10import os
11import pkgutil
12import sys
13from collections import defaultdict
14from collections.abc import Mapping, Sequence
15from functools import lru_cache
16from typing import (
17 Any,
18 Literal,
19 Optional,
20 TypeVar,
21 Union,
22 overload,
23)
24
25import click
26import idstools.rule
27import tabulate
28
29# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
30_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
31if sys.path[0] != _suricata_check_path:
32 sys.path.insert(0, _suricata_check_path)
33
34from suricata_check import ( # noqa: E402
35 __version__,
36 check_for_update,
37 get_dependency_versions,
38)
39from suricata_check.checkers.interface import CheckerInterface # noqa: E402
40from suricata_check.checkers.interface.dummy import DummyChecker # noqa: E402
41from suricata_check.utils._click import ClickHandler, ClickHelpOption # noqa: E402
42from suricata_check.utils._path import find_rules_file # noqa: E402
43from suricata_check.utils.checker import ( # noqa: E402
44 check_rule_option_recognition,
45 get_rule_suboption,
46)
47from suricata_check.utils.checker_typing import ( # noqa: E402
48 EXTENSIVE_SUMMARY_TYPE,
49 ISSUES_TYPE,
50 RULE_REPORTS_TYPE,
51 RULE_SUMMARY_TYPE,
52 SIMPLE_SUMMARY_TYPE,
53 InvalidRuleError,
54 OutputReport,
55 OutputSummary,
56 RuleReport,
57 get_all_subclasses,
58)
59from suricata_check.utils.regex import get_regex_provider, is_valid_rule # noqa: E402
60
61LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
62LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
63GITLAB_SEVERITIES = {
64 logging.DEBUG: "info",
65 logging.INFO: "info",
66 logging.WARNING: "minor",
67 logging.ERROR: "major",
68 logging.CRITICAL: "critical",
69}
70GITHUB_SEVERITIES = {
71 logging.DEBUG: "debug",
72 logging.INFO: "notice",
73 logging.WARNING: "warning",
74 logging.ERROR: "error",
75 logging.CRITICAL: "error",
76}
77GITHUB_COMMAND = (
78 "::{level} file={file},line={line},endLine={end_line},title={title}::{message}"
79)
80
81_logger = logging.getLogger(__name__)
82
83_regex_provider = get_regex_provider()
84
85# Global variable to check if extensions have already been imported in case get_checkers() is called multiple times.
86suricata_check_extensions_imported = False
87
88
89@click.command()
90@click.option(
91 "--ini",
92 "-i",
93 help="Path to suricata-check.ini file to read configuration from.",
94 show_default=True,
95)
96@click.option(
97 "--rules",
98 "-r",
99 help="Path to Suricata rules to provide check on.",
100 show_default=True,
101)
102@click.option(
103 "--single-rule",
104 "-s",
105 help="A single Suricata rule to be checked",
106 show_default=False,
107)
108@click.option(
109 "--out",
110 "-o",
111 help="Path to suricata-check output folder.",
112 show_default=True,
113)
114@click.option(
115 "--log-level",
116 help=f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
117 show_default=True,
118)
119@click.option(
120 "--gitlab",
121 help="Flag to create CodeClimate output report for GitLab CI/CD.",
122 show_default=True,
123 is_flag=True,
124)
125@click.option(
126 "--github",
127 help="Flag to write workflow commands to stdout for GitHub CI/CD.",
128 show_default=True,
129 is_flag=True,
130)
131@click.option(
132 "--evaluate-disabled",
133 help="Flag to evaluate disabled rules.",
134 show_default=True,
135 is_flag=True,
136)
137@click.option(
138 "--issue-severity",
139 help=f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
140 show_default=True,
141)
142@click.option(
143 "--include-all",
144 "-a",
145 help="Flag to indicate all checker codes should be enabled.",
146 show_default=True,
147 is_flag=True,
148)
149@click.option(
150 "--include",
151 "-i",
152 help="List of all checker codes to enable.",
153 show_default=True,
154 multiple=True,
155)
156@click.option(
157 "--exclude",
158 "-e",
159 help="List of all checker codes to disable.",
160 show_default=True,
161 multiple=True,
162)
163@click.help_option("-h", "--help", cls=ClickHelpOption)
164def main( # noqa: PLR0915
165 **kwargs: dict[str, Any],
166) -> None:
167 """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
168
169 Raises:
170 BadParameter: If provided arguments are invalid.
171
172 RuntimeError: If no checkers could be automatically discovered.
173
174 """
175 # Look for a ini file and parse it.
176 ini_kwargs = __get_ini_kwargs(
177 str(kwargs["ini"]) if kwargs["ini"] is not None else None # type: ignore reportUnnecessaryComparison
178 )
179
180 # Verify CLI argument types and get CLI arguments or use default arguments
181 rules: str = __get_verified_kwarg([kwargs, ini_kwargs], "rules", str, False, ".")
182 single_rule: Optional[str] = __get_verified_kwarg(
183 [kwargs, ini_kwargs], "single_rule", str, True, None
184 )
185 out: str = __get_verified_kwarg([kwargs, ini_kwargs], "out", str, False, ".")
186 log_level: LogLevel = __get_verified_kwarg(
187 [kwargs, ini_kwargs], "log_level", str, False, "DEBUG"
188 )
189 gitlab: bool = __get_verified_kwarg(
190 [kwargs, ini_kwargs], "gitlab", bool, False, False
191 )
192 github: bool = __get_verified_kwarg(
193 [kwargs, ini_kwargs], "github", bool, False, False
194 )
195 evaluate_disabled: bool = __get_verified_kwarg(
196 [kwargs, ini_kwargs], "evaluate_disabled", bool, False, False
197 )
198 issue_severity: LogLevel = __get_verified_kwarg(
199 [kwargs, ini_kwargs], "issue_severity", str, False, "INFO"
200 )
201 include_all: bool = __get_verified_kwarg(
202 [kwargs, ini_kwargs], "include_all", bool, False, False
203 )
204 include: tuple[str, ...] = __get_verified_kwarg(
205 [kwargs, ini_kwargs], "include", tuple, False, ()
206 )
207 exclude: tuple[str, ...] = __get_verified_kwarg(
208 [kwargs, ini_kwargs], "exclude", tuple, False, ()
209 )
210
211 # Verify that out argument is valid
212 if os.path.exists(out) and not os.path.isdir(out):
213 raise click.BadParameter(f"Error: {out} is not a directory.")
214
215 # Verify that log_level argument is valid
216 if log_level not in LOG_LEVELS:
217 raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
218
219 # Create out directory if non-existent
220 if not os.path.exists(out):
221 os.makedirs(out)
222
223 # Setup logging from a seperate thread
224 queue = multiprocessing.Manager().Queue()
225 queue_handler = logging.handlers.QueueHandler(queue)
226
227 click_handler = ClickHandler(
228 github=github, github_level=getattr(logging, log_level)
229 )
230 logging.basicConfig(
231 level=log_level,
232 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
233 handlers=(queue_handler, click_handler),
234 force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", "FALSE") == "TRUE",
235 )
236
237 file_handler = logging.FileHandler(
238 filename=os.path.join(out, "suricata-check.log"),
239 delay=True,
240 )
241 queue_listener = logging.handlers.QueueListener(
242 queue,
243 file_handler,
244 respect_handler_level=True,
245 )
246
247 def _at_exit() -> None:
248 """Cleans up logging listener and handlers before exiting."""
249 queue_listener.enqueue_sentinel()
250 queue_listener.stop()
251 file_handler.flush()
252 file_handler.close()
253 atexit.unregister(_at_exit)
254
255 atexit.register(_at_exit)
256
257 queue_listener.start()
258
259 # Log the arguments:
260 _logger.info("Running suricata-check with the following arguments:")
261 _logger.info("out: %s", out)
262 _logger.info("rules: %s", rules)
263 _logger.info("single_rule: %s", single_rule)
264 _logger.info("log_level: %s", log_level)
265 _logger.info("gitlab: %s", gitlab)
266 _logger.info("github: %s", github)
267 _logger.info("evaluate_disabled: %s", evaluate_disabled)
268 _logger.info("issue_severity: %s", issue_severity)
269 _logger.info("include_all: %s", include_all)
270 _logger.info("include: %s", include)
271 _logger.info("exclude: %s", exclude)
272
273 # Log the environment:
274 _logger.debug("Platform: %s", sys.platform)
275 _logger.debug("Python version: %s", sys.version)
276 _logger.debug("suricata-check path: %s", _suricata_check_path)
277 _logger.debug("suricata-check version: %s", __version__)
278 for package, version in get_dependency_versions().items():
279 _logger.debug("Dependency %s version: %s", package, version)
280
281 check_for_update()
282
283 # Verify that include and exclude arguments are valid
284 if include_all and len(include) > 0:
285 raise click.BadParameter(
286 "Error: Cannot use --include-all and --include together."
287 )
288 if include_all:
289 include = (".*",)
290
291 # Verify that issue_severity argument is valid
292 if issue_severity not in LOG_LEVELS:
293 raise click.BadParameter(
294 f"Error: {issue_severity} is not a valid issue severity or log level."
295 )
296
297 checkers = get_checkers(
298 include, exclude, issue_severity=getattr(logging, issue_severity)
299 )
300
301 if single_rule is not None:
302 __main_single_rule(out, single_rule, checkers)
303
304 # Return here so no rules file is processed.
305 _at_exit()
306 return
307
308 # Check if the rules argument is valid and find the rules file
309 rules = find_rules_file(rules)
310
311 output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
312
313 __write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
314
315 _at_exit()
316
317
318def __get_ini_kwargs(path: Optional[str]) -> dict[str, Any]: # noqa: C901, PLR0912
319 ini_kwargs: dict[str, Any] = {}
320 if path is not None:
321 if not os.path.exists(path):
322 raise click.BadParameter(
323 f"Error: INI file provided in {path} but no options loaded"
324 )
325
326 # Use the default path if no path was provided
327 if path is None:
328 path = "suricata-check.ini"
329 if not os.path.exists(path):
330 return {}
331
332 config_parser = configparser.ConfigParser(
333 empty_lines_in_values=False,
334 default_section="suricata-check",
335 converters={"tuple": lambda x: tuple(json.loads(x))},
336 )
337 config_parser.read(path)
338 ini_kwargs = {}
339
340 if config_parser.has_option("suricata-check", "rules"):
341 ini_kwargs["rules"] = config_parser.get("suricata-check", "rules")
342 if config_parser.has_option("suricata-check", "out"):
343 ini_kwargs["out"] = config_parser.get("suricata-check", "out")
344 if config_parser.has_option("suricata-check", "log"):
345 ini_kwargs["log"] = config_parser.get("suricata-check", "log")
346 if config_parser.has_option("suricata-check", "gitlab"):
347 ini_kwargs["gitlab"] = config_parser.getboolean("suricata-check", "gitlab")
348 if config_parser.has_option("suricata-check", "github"):
349 ini_kwargs["github"] = config_parser.getboolean("suricata-check", "github")
350 if config_parser.has_option("suricata-check", "evaluate_disabled"):
351 ini_kwargs["evaluate_disabled"] = config_parser.getboolean(
352 "suricata-check", "evaluate_disabled"
353 )
354 if config_parser.has_option("suricata-check", "issue-severity"):
355 ini_kwargs["issue_severity"] = config_parser.get(
356 "suricata-check", "issue-severity"
357 )
358 if config_parser.has_option("suricata-check", "include-all"):
359 ini_kwargs["include_all"] = config_parser.getboolean(
360 "suricata-check", "include-all"
361 )
362 if config_parser.has_option("suricata-check", "include"):
363 ini_kwargs["include"] = config_parser.gettuple("suricata-check", "include") # type: ignore reportAttributeAccessIssue
364 if config_parser.has_option("suricata-check", "exclude"):
365 ini_kwargs["exclude"] = config_parser.gettuple("suricata-check", "exclude") # type: ignore reportAttributeAccessIssue
366
367 return ini_kwargs
368
369
370D = TypeVar("D")
371
372
373@overload
374def __get_verified_kwarg(
375 kwargss: Sequence[dict[str, Any]],
376 name: str,
377 expected_type: type,
378 optional: Literal[True],
379 default: D,
380) -> Optional[D]:
381 pass
382
383
384@overload
385def __get_verified_kwarg(
386 kwargss: Sequence[dict[str, Any]],
387 name: str,
388 expected_type: type,
389 optional: Literal[False],
390 default: D,
391) -> D:
392 pass
393
394
395def __get_verified_kwarg(
396 kwargss: Sequence[dict[str, Any]],
397 name: str,
398 expected_type: type,
399 optional: bool,
400 default: D,
401) -> Optional[D]:
402 for kwargs in kwargss:
403 if name in kwargs:
404 if kwargs[name] is None:
405 if optional and default is not None:
406 return None
407 return default
408
409 if kwargs[name] is not default:
410 if not isinstance(kwargs[name], expected_type):
411 raise click.BadParameter(
412 f"""Error: \
413 Argument `{name}` should have a value of type `{expected_type}` \
414 but has value {kwargs[name]} of type {kwargs[name].__class__} instead."""
415 )
416 return kwargs[name]
417
418 return default
419
420
421def __main_single_rule(
422 out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]]
423) -> None:
424 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(single_rule)
425
426 # Verify that a rule was parsed correctly.
427 if rule is None:
428 msg = f"Error parsing rule from user input: {single_rule}"
429 _logger.critical(msg)
430 raise click.BadParameter(f"Error: {msg}")
431
432 if not is_valid_rule(rule):
433 msg = f"Error parsing rule from user input: {single_rule}"
434 _logger.critical(msg)
435 raise click.BadParameter(f"Error: {msg}")
436
437 _logger.debug("Processing rule: %s", rule["sid"])
438
439 rule_report = analyze_rule(rule, checkers=checkers)
440
441 __write_output(OutputReport(rules=[rule_report]), out)
442
443
444def __write_output(
445 output: OutputReport,
446 out: str,
447 gitlab: bool = False,
448 github: bool = False,
449 rules_file: Optional[str] = None,
450) -> None:
451 _logger.info(
452 "Writing output to suricata-check.jsonl and suricata-check-fast.log in %s",
453 os.path.abspath(out),
454 )
455 with (
456 open(
457 os.path.join(out, "suricata-check.jsonl"),
458 "w",
459 buffering=io.DEFAULT_BUFFER_SIZE,
460 ) as jsonl_fh,
461 open(
462 os.path.join(out, "suricata-check-fast.log"),
463 "w",
464 buffering=io.DEFAULT_BUFFER_SIZE,
465 ) as fast_fh,
466 ):
467 rules: RULE_REPORTS_TYPE = output.rules
468 jsonl_fh.write("\n".join([str(rule) for rule in rules]))
469
470 for rule_report in rules:
471 rule: idstools.rule.Rule = rule_report.rule
472 lines: str = (
473 "{}-{}".format(rule_report.line_begin, rule_report.line_end)
474 if rule_report.line_begin
475 else "Unknown"
476 )
477 issues: ISSUES_TYPE = rule_report.issues
478 for issue in issues:
479 code = issue.code
480 severity = (
481 logging.getLevelName(issue.severity) if issue.severity else None
482 )
483 issue_msg = issue.message.replace("\n", " ")
484
485 msg = "[{}]{} Lines {}, sid {}: {}".format(
486 code,
487 f" ({severity})" if severity else "",
488 lines,
489 rule["sid"],
490 issue_msg,
491 )
492 fast_fh.write(msg + "\n")
493 click.secho(msg, color=True, fg="blue")
494
495 if output.summary is not None:
496 __write_output_stats(output, out)
497
498 if gitlab:
499 assert rules_file is not None
500
501 __write_output_gitlab(output, out, rules_file)
502
503 if github:
504 assert rules_file is not None
505
506 __write_output_github(output, rules_file)
507
508
509def __write_output_stats(output: OutputReport, out: str) -> None:
510 assert output.summary is not None
511
512 with open(
513 os.path.join(out, "suricata-check-stats.log"),
514 "w",
515 buffering=io.DEFAULT_BUFFER_SIZE,
516 ) as stats_fh:
517 summary: OutputSummary = output.summary
518
519 overall_summary: SIMPLE_SUMMARY_TYPE = summary.overall_summary
520
521 n_issues = overall_summary["Total Issues"]
522 n_rules = (
523 overall_summary["Rules with Issues"]
524 + overall_summary["Rules without Issues"]
525 )
526
527 stats_fh.write(
528 tabulate.tabulate(
529 (
530 (
531 k,
532 v,
533 (
534 "{:.0%}".format(v / n_rules)
535 if k.startswith("Rules ") and n_rules > 0
536 else "-"
537 ),
538 )
539 for k, v in overall_summary.items()
540 ),
541 headers=(
542 "Count",
543 "Percentage of Rules",
544 ),
545 )
546 + "\n\n",
547 )
548
549 click.secho(
550 f"Total issues found: {overall_summary['Total Issues']}",
551 color=True,
552 bold=True,
553 fg="blue",
554 )
555 click.secho(
556 f"Rules with Issues found: {overall_summary['Rules with Issues']}",
557 color=True,
558 bold=True,
559 fg="blue",
560 )
561
562 issues_by_group: SIMPLE_SUMMARY_TYPE = summary.issues_by_group
563
564 stats_fh.write(
565 tabulate.tabulate(
566 (
567 (k, v, "{:.0%}".format(v / n_issues) if n_issues > 0 else "-")
568 for k, v in issues_by_group.items()
569 ),
570 headers=(
571 "Count",
572 "Percentage of Total Issues",
573 ),
574 )
575 + "\n\n",
576 )
577
578 issues_by_type: EXTENSIVE_SUMMARY_TYPE = summary.issues_by_type
579 for checker, checker_issues_by_type in issues_by_type.items():
580 stats_fh.write(" " + checker + " " + "\n")
581 stats_fh.write("-" * (len(checker) + 2) + "\n")
582 stats_fh.write(
583 tabulate.tabulate(
584 (
585 (
586 k,
587 v,
588 "{:.0%}".format(v / n_rules) if n_rules > 0 else "-",
589 )
590 for k, v in checker_issues_by_type.items()
591 ),
592 headers=(
593 "Count",
594 "Percentage of Rules",
595 ),
596 )
597 + "\n\n",
598 )
599
600
601def __write_output_gitlab(output: OutputReport, out: str, rules_file: str) -> None:
602 with open(
603 os.path.join(out, "suricata-check-gitlab.json"),
604 "w",
605 buffering=io.DEFAULT_BUFFER_SIZE,
606 ) as gitlab_fh:
607 issue_dicts = []
608 for rule_report in output.rules:
609 line_begin: Optional[int] = rule_report.line_begin
610 assert line_begin is not None
611 line_end: Optional[int] = rule_report.line_end
612 assert line_end is not None
613 issues: ISSUES_TYPE = rule_report.issues
614 for issue in issues:
615 code = issue.code
616 issue_msg = issue.message.replace("\n", " ")
617 assert issue.checker is not None
618 issue_checker = issue.checker
619 issue_hash = str(issue.hash)
620 assert issue.severity is not None
621 issue_severity = GITLAB_SEVERITIES[issue.severity]
622
623 issue_dict: Mapping[
624 str,
625 Union[str, list[str], Mapping[str, Union[str, Mapping[str, int]]]],
626 ] = {
627 "description": issue_msg,
628 "categories": [issue_checker],
629 "check_name": f"Suricata Check {code}",
630 "fingerprint": issue_hash,
631 "severity": issue_severity,
632 "location": {
633 "path": rules_file,
634 "lines": {"begin": line_begin, "end": line_end},
635 },
636 }
637 issue_dicts.append(issue_dict)
638
639 gitlab_fh.write(json.dumps(issue_dicts))
640
641
642def __write_output_github(output: OutputReport, rules_file: str) -> None:
643 output_lines: dict[str, list[str]] = {
644 k: [] for k in set(GITHUB_SEVERITIES.values())
645 }
646 for rule_report in output.rules:
647 line_begin: Optional[int] = rule_report.line_begin
648 assert line_begin is not None
649 line_end: Optional[int] = rule_report.line_end
650 assert line_end is not None
651 issues: ISSUES_TYPE = rule_report.issues
652 for issue in issues:
653 code = issue.code
654 issue_msg = issue.message.replace("\n", " ")
655 assert issue.checker is not None
656 issue_checker = issue.checker
657 assert issue.severity is not None
658 issue_severity = GITHUB_SEVERITIES[issue.severity]
659 title = f"{issue_checker} - {code}"
660
661 output_lines[issue_severity].append(
662 GITHUB_COMMAND.format(
663 level=issue_severity,
664 file=rules_file,
665 line=line_begin,
666 end_line=line_end,
667 title=title,
668 message=issue_msg,
669 )
670 )
671
672 for message_level, lines in output_lines.items():
673 if len(lines) > 0:
674 print(f"::group::{message_level}") # noqa: T201
675 for message in lines:
676 print(message) # noqa: T201
677 print("::endgroup::") # noqa: T201
678
679
[docs]
680def process_rules_file( # noqa: C901, PLR0912, PLR0915
681 rules: str,
682 evaluate_disabled: bool,
683 checkers: Optional[Sequence[CheckerInterface]] = None,
684) -> OutputReport:
685 """Processes a rule file and returns a list of rules and their issues.
686
687 Args:
688 rules: A path to a Suricata rules file.
689 evaluate_disabled: A flag indicating whether disabled rules should be evaluated.
690 checkers: The checkers to be used when processing the rule file.
691
692 Returns:
693 A list of rules and their issues.
694
695 Raises:
696 RuntimeError: If no checkers could be automatically discovered.
697
698 """
699 if checkers is None:
700 checkers = get_checkers()
701
702 output = OutputReport()
703
704 with (
705 open(
706 os.path.normpath(rules),
707 buffering=io.DEFAULT_BUFFER_SIZE,
708 ) as rules_fh,
709 ):
710 if len(checkers) == 0:
711 msg = "No checkers provided for processing rules."
712 _logger.error(msg)
713 raise RuntimeError(msg)
714
715 _logger.info("Processing rule file: %s", rules)
716
717 collected_multiline_parts: Optional[str] = None
718 multiline_begin_number: Optional[int] = None
719
720 for number, line in enumerate(rules_fh.readlines(), start=1):
721 # First work on collecting and parsing multiline rules
722 if line.rstrip("\r\n").endswith("\\"):
723 multiline_part = line.rstrip("\r\n")[:-1]
724
725 if collected_multiline_parts is None:
726 collected_multiline_parts = multiline_part
727 multiline_begin_number = number
728 else:
729 collected_multiline_parts += multiline_part.lstrip()
730
731 continue
732
733 # Process final part of multiline rule if one is being collected
734 if collected_multiline_parts is not None:
735 collected_multiline_parts += line.lstrip()
736
737 rule_line = collected_multiline_parts.strip()
738
739 collected_multiline_parts = None
740 # If no multiline rule is being collected process as a potential single line rule
741 else:
742 if len(line.strip()) == 0:
743 continue
744
745 if line.strip().startswith("#"):
746 if evaluate_disabled:
747 # Verify that this line is a rule and not a comment
748 if idstools.rule.parse(line) is None:
749 # Log the comment since it may be a invalid rule
750 _logger.warning(
751 "Ignoring comment on line %i: %s", number, line
752 )
753 continue
754 else:
755 # Skip the rule
756 continue
757
758 rule_line = line.strip()
759
760 try:
761 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(rule_line)
762 except Exception: # noqa: BLE001
763 _logger.error(
764 "Internal error in idstools parsing rule on line %i: %s",
765 number,
766 rule_line,
767 )
768 rule = None
769
770 # Parse comment and potential ignore comment to ignore rules
771 ignore = __parse_type_ignore(rule)
772
773 # Verify that a rule was parsed correctly.
774 if rule is None:
775 _logger.error("Error parsing rule on line %i: %s", number, rule_line)
776 continue
777
778 if not is_valid_rule(rule):
779 _logger.error("Invalid rule on line %i: %s", number, rule_line)
780 continue
781
782 _logger.debug("Processing rule: %s on line %i", rule["sid"], number)
783
784 rule_report: RuleReport = analyze_rule(
785 rule,
786 checkers=checkers,
787 ignore=ignore,
788 )
789 rule_report.line_begin = multiline_begin_number or number
790 rule_report.line_end = number
791
792 output.rules.append(rule_report)
793
794 multiline_begin_number = None
795
796 _logger.info("Completed processing rule file: %s", rules)
797
798 output.summary = __summarize_output(output, checkers)
799
800 return output
801
802
803def __is_valid_idstools_rule(text: str) -> bool:
804 try:
805 rule: Optional[idstools.rule.Rule] = idstools.rule.parse(text)
806 except Exception: # noqa: BLE001
807 return False
808
809 if rule is None:
810 return False
811
812 return True
813
814
815def __parse_type_ignore(rule: Optional[idstools.rule.Rule]) -> Optional[Sequence[str]]:
816 if rule is None:
817 return None
818
819 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check")
820 if ignore_value is None:
821 return []
822
823 return ignore_value.strip(' "').split(",")
824
825
826def _import_extensions() -> None:
827 global suricata_check_extensions_imported # noqa: PLW0603
828 if suricata_check_extensions_imported is True:
829 return
830
831 for module in pkgutil.iter_modules():
832 if module.name.startswith("suricata_check_"):
833 try:
834 imported_module = __import__(module.name)
835 _logger.info(
836 "Detected and successfully imported suricata-check extension %s with version %s.",
837 module.name.replace("_", "-"),
838 getattr(imported_module, "__version__"),
839 )
840 except ImportError:
841 _logger.warning(
842 "Detected potential suricata-check extension %s but failed to import it.",
843 module.name.replace("_", "-"),
844 )
845 suricata_check_extensions_imported = True
846
847
[docs]
848@lru_cache(maxsize=1)
849def get_checkers(
850 include: Sequence[str] = (".*",),
851 exclude: Sequence[str] = (),
852 issue_severity: int = logging.INFO,
853) -> Sequence[CheckerInterface]:
854 """Auto discovers all available checkers that implement the CheckerInterface.
855
856 Returns:
857 A list of available checkers that implement the CheckerInterface.
858
859 """
860 # Check for extensions and try to import them
861 _import_extensions()
862
863 checkers: list[CheckerInterface] = []
864 for checker in get_all_subclasses(CheckerInterface):
865 if checker.__name__ == DummyChecker.__name__:
866 continue
867
868 # Initialize DummyCheckers to retrieve error messages.
869 if issubclass(checker, DummyChecker):
870 checker()
871
872 enabled, relevant_codes = __get_checker_enabled(
873 checker, include, exclude, issue_severity
874 )
875
876 if enabled:
877 checkers.append(checker(include=relevant_codes))
878
879 else:
880 _logger.info("Checker %s is disabled.", checker.__name__)
881
882 _logger.info(
883 "Discovered and enabled checkers: [%s]",
884 ", ".join([c.__class__.__name__ for c in checkers]),
885 )
886 if len(checkers) == 0:
887 _logger.warning(
888 "No checkers were enabled. Check the include and exclude arguments."
889 )
890
891 # Perform a uniqueness check on the codes emmitted by the checkers
892 for checker1 in checkers:
893 for checker2 in checkers:
894 if checker1 == checker2:
895 continue
896 if not set(checker1.codes).isdisjoint(checker2.codes):
897 msg = f"Checker {checker1.__class__.__name__} and {checker2.__class__.__name__} have overlapping codes."
898 _logger.error(msg)
899
900 return sorted(checkers, key=lambda x: x.__class__.__name__)
901
902
903def __get_checker_enabled(
904 checker: type[CheckerInterface],
905 include: Sequence[str],
906 exclude: Sequence[str],
907 issue_severity: int,
908) -> tuple[bool, set[str]]:
909 enabled = checker.enabled_by_default
910
911 # If no include regexes are provided, include all by default
912 if len(include) == 0:
913 relevant_codes = set(checker.codes.keys())
914 else:
915 # If include regexes are provided, include all codes that match any of these regexes
916 relevant_codes = set()
917
918 for regex in include:
919 relevant_codes.update(
920 set(
921 filter(
922 lambda code: _regex_provider.compile("^" + regex + "$").match(
923 code
924 )
925 is not None,
926 checker.codes.keys(),
927 )
928 )
929 )
930
931 if len(relevant_codes) > 0:
932 enabled = True
933
934 # Now remove the codes that are excluded according to any of the provided exclude regexes
935 for regex in exclude:
936 relevant_codes = set(
937 filter(
938 lambda code: _regex_provider.compile("^" + regex + "$").match(code)
939 is None,
940 relevant_codes,
941 )
942 )
943
944 # Now filter out irrelevant codes based on severity
945 relevant_codes = set(
946 filter(
947 lambda code: checker.codes[code]["severity"] >= issue_severity,
948 relevant_codes,
949 )
950 )
951
952 if len(relevant_codes) == 0:
953 enabled = False
954
955 return enabled, relevant_codes
956
957
[docs]
958def analyze_rule(
959 rule: idstools.rule.Rule,
960 checkers: Optional[Sequence[CheckerInterface]] = None,
961 ignore: Optional[Sequence[str]] = None,
962) -> RuleReport:
963 """Checks a rule and returns a dictionary containing the rule and a list of issues found.
964
965 Args:
966 rule: The rule to be checked.
967 checkers: The checkers to be used to check the rule.
968 ignore: Regular expressions to match checker codes to ignore
969
970 Returns:
971 A list of issues found in the rule.
972 Each issue is typed as a `dict`.
973
974 Raises:
975 InvalidRuleError: If the rule does not follow the Suricata syntax.
976
977 """
978 if not is_valid_rule(rule):
979 raise InvalidRuleError(rule["raw"])
980
981 check_rule_option_recognition(rule)
982
983 if checkers is None:
984 checkers = get_checkers()
985
986 rule_report: RuleReport = RuleReport(rule=rule)
987
988 _logger.warning(ignore)
989
990 compiled_ignore = (
991 [_regex_provider.compile(r) for r in ignore] if ignore is not None else []
992 )
993
994 for checker in checkers:
995 try:
996 issues = checker.check_rule(rule)
997 for r in compiled_ignore:
998 issues = list(filter(lambda issue: r.match(issue.code) is None, issues))
999 rule_report.add_issues(issues)
1000 except Exception as exception: # noqa: BLE001
1001 _logger.warning(
1002 "Failed to run %s on rule: %s",
1003 checker.__class__.__name__,
1004 rule["raw"],
1005 extra={"exception": exception},
1006 )
1007
1008 rule_report.summary = __summarize_rule(rule_report, checkers)
1009
1010 return rule_report
1011
1012
1013def __summarize_rule(
1014 rule: RuleReport,
1015 checkers: Optional[Sequence[CheckerInterface]] = None,
1016) -> RULE_SUMMARY_TYPE:
1017 """Summarizes the issues found in a rule.
1018
1019 Args:
1020 rule: The rule output dictionary to be summarized.
1021 checkers: The checkers to be used to check the rule.
1022
1023 Returns:
1024 A dictionary containing a summary of all issues found in the rule.
1025
1026 """
1027 if checkers is None:
1028 checkers = get_checkers()
1029
1030 summary = {}
1031
1032 issues: ISSUES_TYPE = rule.issues
1033 summary["total_issues"] = len(issues)
1034 summary["issues_by_group"] = defaultdict(int)
1035 for issue in issues:
1036 checker = issue.checker
1037 summary["issues_by_group"][checker] += 1
1038
1039 # Ensure also checkers without issues are included in the report.
1040 for checker in checkers:
1041 if checker.__class__.__name__ not in summary["issues_by_group"]:
1042 summary["issues_by_group"][checker.__class__.__name__] = 0
1043
1044 # Sort dictionaries for deterministic output
1045 summary["issues_by_group"] = __sort_mapping(summary["issues_by_group"])
1046
1047 return summary
1048
1049
1050def __summarize_output(
1051 output: OutputReport,
1052 checkers: Optional[Sequence[CheckerInterface]] = None,
1053) -> OutputSummary:
1054 """Summarizes the issues found in a rules file.
1055
1056 Args:
1057 output: The unsammarized output of the rules file containing all rules and their issues.
1058 checkers: The checkers to be used to check the rule.
1059
1060 Returns:
1061 A dictionary containing a summary of all issues found in the rules file.
1062
1063 """
1064 if checkers is None:
1065 checkers = get_checkers()
1066
1067 return OutputSummary(
1068 overall_summary=__get_overall_summary(output),
1069 issues_by_group=__get_issues_by_group(output, checkers),
1070 issues_by_type=__get_issues_by_type(output, checkers),
1071 )
1072
1073
1074def __get_overall_summary(
1075 output: OutputReport,
1076) -> SIMPLE_SUMMARY_TYPE:
1077 overall_summary = {
1078 "Total Issues": 0,
1079 "Rules with Issues": 0,
1080 "Rules without Issues": 0,
1081 }
1082
1083 rules: RULE_REPORTS_TYPE = output.rules
1084 for rule in rules:
1085 issues: ISSUES_TYPE = rule.issues
1086 overall_summary["Total Issues"] += len(issues)
1087
1088 if len(issues) == 0:
1089 overall_summary["Rules without Issues"] += 1
1090 else:
1091 overall_summary["Rules with Issues"] += 1
1092
1093 return overall_summary
1094
1095
1096def __get_issues_by_group(
1097 output: OutputReport,
1098 checkers: Optional[Sequence[CheckerInterface]] = None,
1099) -> SIMPLE_SUMMARY_TYPE:
1100 if checkers is None:
1101 checkers = get_checkers()
1102
1103 issues_by_group = defaultdict(int)
1104
1105 # Ensure also checkers and codes without issues are included in the report.
1106 for checker in checkers:
1107 issues_by_group[checker.__class__.__name__] = 0
1108
1109 rules: RULE_REPORTS_TYPE = output.rules
1110 for rule in rules:
1111 issues: ISSUES_TYPE = rule.issues
1112
1113 for issue in issues:
1114 checker = issue.checker
1115 if checker is not None:
1116 issues_by_group[checker] += 1
1117
1118 return __sort_mapping(issues_by_group)
1119
1120
1121def __get_issues_by_type(
1122 output: OutputReport,
1123 checkers: Optional[Sequence[CheckerInterface]] = None,
1124) -> EXTENSIVE_SUMMARY_TYPE:
1125 if checkers is None:
1126 checkers = get_checkers()
1127 issues_by_type: EXTENSIVE_SUMMARY_TYPE = defaultdict(lambda: defaultdict(int))
1128
1129 # Ensure also checkers and codes without issues are included in the report.
1130 for checker in checkers:
1131 for code in checker.codes:
1132 issues_by_type[checker.__class__.__name__][code] = 0
1133
1134 rules: RULE_REPORTS_TYPE = output.rules
1135 for rule in rules:
1136 issues: ISSUES_TYPE = rule.issues
1137
1138 checker_codes = defaultdict(lambda: defaultdict(int))
1139 for issue in issues:
1140 checker = issue.checker
1141 if checker is not None:
1142 code = issue.code
1143 checker_codes[checker][code] += 1
1144
1145 for checker, codes in checker_codes.items():
1146 for code, count in codes.items():
1147 issues_by_type[checker][code] += count
1148
1149 for key in issues_by_type:
1150 issues_by_type[key] = __sort_mapping(issues_by_type[key])
1151
1152 return __sort_mapping(issues_by_type)
1153
1154
1155def __sort_mapping(mapping: Mapping) -> dict:
1156 return {key: mapping[key] for key in sorted(mapping.keys())}
1157
1158
1159if __name__ == "__main__":
1160 main()