1"""The `suricata_check.suricata_check` module contains the command line utility and the main program logic."""
2
3import atexit
4import configparser
5import io
6import json
7import logging
8import logging.handlers
9import multiprocessing
10import os
11import sys
12import threading
13from collections.abc import Sequence
14from typing import (
15 Any,
16 Callable,
17 Literal,
18 Optional,
19 TypeVar,
20 Union,
21)
22
23import click
24
25_AnyCallable = Callable[..., Any]
26_FC = TypeVar("_FC", bound=Union[_AnyCallable, click.Command])
27
28LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
29
30# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
31_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
32if sys.path[0] != _suricata_check_path:
33 sys.path.insert(0, _suricata_check_path)
34
35from suricata_check._checkers import get_checkers # noqa: E402
36from suricata_check._output import ( # noqa: E402
37 summarize_output,
38 summarize_rule,
39 write_output,
40)
41from suricata_check._version import ( # noqa: E402
42 __version__,
43 check_for_update,
44 get_dependency_versions,
45)
46from suricata_check.checkers.interface import CheckerInterface # noqa: E402
47from suricata_check.utils._click import ClickHandler, help_option # noqa: E402
48from suricata_check.utils._path import find_rules_file # noqa: E402
49from suricata_check.utils.checker import ( # noqa: E402
50 check_rule_option_recognition,
51 get_rule_option,
52 get_rule_suboption,
53)
54from suricata_check.utils.checker_typing import ( # noqa: E402
55 InvalidRuleError,
56 OutputReport,
57 RuleReport,
58)
59from suricata_check.utils.regex import is_valid_rule # noqa: E402
60from suricata_check.utils.regex_provider import get_regex_provider # noqa: E402
61from suricata_check.utils.rule import ParsingError, Rule, parse # noqa: E402
62
63LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
64LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
65
66# Define all command line arguments and their properties
67CLI_ARGUMENTS: dict[str, dict[str, Any]] = {
68 "rules": {
69 "help": "Path to Suricata rules to provide check on.",
70 "show_default": True,
71 "type": str,
72 "required": False,
73 "default": ".",
74 "cli_options": ["-r"],
75 },
76 "single_rule": {
77 "help": "A single Suricata rule to be checked",
78 "show_default": False,
79 "type": str,
80 "required": False,
81 "default": None,
82 "cli_options": ["-s"],
83 },
84 "out": {
85 "help": "Path to suricata-check output folder.",
86 "show_default": True,
87 "type": str,
88 "required": False,
89 "default": ".",
90 "cli_options": ["-o"],
91 },
92 "log_level": {
93 "help": f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
94 "show_default": True,
95 "type": str,
96 "required": False,
97 "default": "DEBUG",
98 },
99 "gitlab": {
100 "help": "Flag to create CodeClimate output report for GitLab CI/CD.",
101 "show_default": True,
102 "type": bool,
103 "required": False,
104 "default": False,
105 "is_flag": True,
106 },
107 "github": {
108 "help": "Flag to write workflow commands to stdout for GitHub CI/CD.",
109 "show_default": True,
110 "type": bool,
111 "required": False,
112 "default": False,
113 "is_flag": True,
114 },
115 "evaluate_disabled": {
116 "help": "Flag to evaluate disabled rules.",
117 "show_default": True,
118 "type": bool,
119 "required": False,
120 "default": False,
121 "is_flag": True,
122 },
123 "issue_severity": {
124 "help": f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
125 "show_default": True,
126 "type": str,
127 "required": False,
128 "default": "INFO",
129 },
130 "include_all": {
131 "help": "Flag to indicate all checker codes should be enabled.",
132 "show_default": True,
133 "type": bool,
134 "required": False,
135 "default": False,
136 "is_flag": True,
137 "cli_options": ["-a"],
138 },
139 "include": {
140 "help": "List of all checker codes to enable.",
141 "show_default": True,
142 "type": tuple,
143 "required": False,
144 "default": (),
145 "multiple": True,
146 "cli_options": ["-i"],
147 },
148 "exclude": {
149 "help": "List of all checker codes to disable.",
150 "show_default": True,
151 "type": tuple,
152 "required": False,
153 "default": (),
154 "multiple": True,
155 "cli_options": ["-e"],
156 },
157}
158CLI_ARGUMENT_TYPE = Optional[Union[str, bool, tuple]]
159
160_logger = logging.getLogger(__name__)
161
162_regex_provider = get_regex_provider()
163
164
165def __create_click_option(name: str, props: dict[str, Any]) -> Callable[[_FC], _FC]:
166 """Create a click.option decorator from argument properties."""
167 kwargs = {
168 "help": props["help"],
169 "show_default": props["show_default"],
170 "default": props.get("default"),
171 "is_flag": props.get("is_flag", False),
172 "multiple": props.get("multiple", False),
173 }
174
175 # Add any additional CLI options (like -r, -s, etc.)
176 cli_opts = props.get("cli_options", [])
177 args = [f"--{name.replace('_', '-')}", *cli_opts]
178
179 return click.option(*args, **kwargs)
180
181
182def __main_decorators() -> Callable[[Callable], click.Command]:
183 """Create the CLI command with all options."""
184
185 def decorator(f: Callable) -> click.Command:
186 # Apply all options in reverse order (bottom to top)
187 command = click.command()(f)
188 command = help_option("-h", "--help")(command)
189
190 # Add ini option first since it's needed before processing other options
191 command = click.option(
192 "--ini",
193 help="Path to suricata-check.ini file to read configuration from.",
194 show_default=True,
195 type=str,
196 default=None,
197 )(command)
198
199 # Apply options from CLI_ARGUMENTS
200 for name, props in reversed(CLI_ARGUMENTS.items()):
201 command = __create_click_option(name, props)(command)
202
203 return command
204
205 return decorator
206
207
208@__main_decorators()
209def main(**kwargs: dict[str, Any]) -> None: # noqa: C901, PLR0915
210 """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
211
212 Raises:
213 BadParameter: If provided arguments are invalid.
214
215 RuntimeError: If no checkers could be automatically discovered.
216
217 """
218 # Look for a ini file and parse it.
219 ini_kwargs = get_ini_kwargs(
220 str(kwargs["ini"]) if kwargs["ini"] is not None else None # type: ignore reportUnnecessaryComparison
221 )
222
223 # Verify CLI argument types and get CLI arguments or use default arguments
224 rules: str = __get_verified_kwarg(
225 [kwargs, ini_kwargs], "rules"
226 ) # pyright: ignore[reportAssignmentType]
227 single_rule: Optional[str] = __get_verified_kwarg(
228 [kwargs, ini_kwargs], "single_rule"
229 ) # pyright: ignore[reportAssignmentType]
230 out: str = __get_verified_kwarg(
231 [kwargs, ini_kwargs], "out"
232 ) # pyright: ignore[reportAssignmentType]
233 log_level: LogLevel = __get_verified_kwarg(
234 [kwargs, ini_kwargs], "log_level"
235 ) # pyright: ignore[reportAssignmentType]
236 gitlab: bool = __get_verified_kwarg(
237 [kwargs, ini_kwargs], "gitlab"
238 ) # pyright: ignore[reportAssignmentType]
239 github: bool = __get_verified_kwarg(
240 [kwargs, ini_kwargs], "github"
241 ) # pyright: ignore[reportAssignmentType]
242 evaluate_disabled: bool = __get_verified_kwarg(
243 [kwargs, ini_kwargs], "evaluate_disabled"
244 ) # pyright: ignore[reportAssignmentType]
245 issue_severity: LogLevel = __get_verified_kwarg(
246 [kwargs, ini_kwargs], "issue_severity"
247 ) # pyright: ignore[reportAssignmentType]
248 include_all: bool = __get_verified_kwarg(
249 [kwargs, ini_kwargs], "include_all"
250 ) # pyright: ignore[reportAssignmentType]
251 include: tuple[str, ...] = __get_verified_kwarg(
252 [kwargs, ini_kwargs], "include"
253 ) # pyright: ignore[reportAssignmentType]
254 exclude: tuple[str, ...] = __get_verified_kwarg(
255 [kwargs, ini_kwargs], "exclude"
256 ) # pyright: ignore[reportAssignmentType]
257
258 # Verify that out argument is valid
259 if os.path.exists(out) and not os.path.isdir(out):
260 raise click.BadParameter(f"Error: {out} is not a directory.")
261
262 # Verify that log_level argument is valid
263 if log_level not in LOG_LEVELS:
264 raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
265
266 # Create out directory if non-existent
267 if not os.path.exists(out):
268 os.makedirs(out)
269
270 # Setup logging from a seperate thread
271 queue = multiprocessing.Manager().Queue()
272 queue_handler = logging.handlers.QueueHandler(queue)
273
274 click_handler = ClickHandler(
275 github=github, github_level=getattr(logging, log_level)
276 )
277 logging.basicConfig(
278 level=log_level,
279 format="%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s",
280 handlers=(queue_handler, click_handler),
281 force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", "FALSE") == "TRUE",
282 )
283
284 file_handler = logging.FileHandler(
285 filename=os.path.join(out, "suricata-check.log"),
286 delay=True,
287 )
288 queue_listener = logging.handlers.QueueListener(
289 queue,
290 file_handler,
291 respect_handler_level=True,
292 )
293
294 def _at_exit() -> None:
295 """Cleans up logging listener and handlers before exiting."""
296 queue_listener.enqueue_sentinel()
297 queue_listener.stop()
298 file_handler.flush()
299 file_handler.close()
300 atexit.unregister(_at_exit)
301
302 atexit.register(_at_exit)
303
304 queue_listener.start()
305
306 # Log the arguments:
307 _logger.info("Running suricata-check with the following arguments:")
308 for arg in CLI_ARGUMENTS:
309 _logger.info("%s: %s", arg, locals().get(arg))
310
311 # Log the environment:
312 _logger.debug("Platform: %s", sys.platform)
313 _logger.debug("Python version: %s", sys.version)
314 _logger.debug("suricata-check path: %s", _suricata_check_path)
315 _logger.debug("suricata-check version: %s", __version__)
316 for package, version in get_dependency_versions().items():
317 _logger.debug("Dependency %s version: %s", package, version)
318
319 threading.Thread(
320 target=check_for_update,
321 ).start()
322
323 # Verify that include and exclude arguments are valid
324 if include_all and len(include) > 0:
325 raise click.BadParameter(
326 "Error: Cannot use --include-all and --include together."
327 )
328 if include_all:
329 include = (".*",)
330
331 # Verify that issue_severity argument is valid
332 if issue_severity not in LOG_LEVELS:
333 raise click.BadParameter(
334 f"Error: {issue_severity} is not a valid issue severity or log level."
335 )
336
337 checkers = get_checkers(
338 include, exclude, issue_severity=getattr(logging, issue_severity)
339 )
340
341 if single_rule is not None:
342 __main_single_rule(out, single_rule, checkers)
343
344 # Return here so no rules file is processed.
345 _at_exit()
346 return
347
348 # Check if the rules argument is valid and find the rules file
349 rules = find_rules_file(rules)
350
351 output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
352
353 write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
354
355 _at_exit()
356
357
[docs]
358def get_ini_kwargs(path: Optional[str]) -> dict[str, Any]:
359 """Read configuration from INI file based on CLI_ARGUMENTS structure."""
360 ini_kwargs: dict[str, Any] = {}
361 if path is not None and not os.path.exists(path):
362 raise click.BadParameter(
363 f"Error: INI file provided in {path} but no options loaded"
364 )
365
366 # Use the default path if no path was provided
367 if path is None:
368 path = "suricata-check.ini"
369 if not os.path.exists(path):
370 return {}
371
372 config_parser = configparser.ConfigParser(
373 empty_lines_in_values=False,
374 default_section="suricata-check",
375 converters={"tuple": lambda x: tuple(json.loads(x))},
376 )
377 config_parser.read(path)
378
379 # Process each argument defined in CLI_ARGUMENTS
380 for arg_name, arg_props in CLI_ARGUMENTS.items():
381 ini_key = arg_name.replace("_", "-")
382 if not config_parser.has_option("suricata-check", ini_key):
383 continue
384
385 # Get the value based on the argument type
386 if arg_props["type"] is bool:
387 ini_kwargs[arg_name] = config_parser.getboolean("suricata-check", ini_key)
388 elif arg_props["type"] is tuple:
389 ini_kwargs[arg_name] = config_parser.gettuple("suricata-check", ini_key) # type: ignore reportAttributeAccessIssue
390 else:
391 ini_kwargs[arg_name] = config_parser.get("suricata-check", ini_key)
392 if arg_props["type"] is str:
393 ini_kwargs[arg_name] = ini_kwargs[arg_name].strip('"')
394
395 return ini_kwargs
396
397
398def __get_verified_kwarg(
399 kwargss: Sequence[dict[str, Any]],
400 name: str,
401) -> CLI_ARGUMENT_TYPE:
402 for kwargs in kwargss:
403 if name in kwargs:
404 if kwargs[name] is None:
405 if (
406 not CLI_ARGUMENTS[name]["required"]
407 and CLI_ARGUMENTS[name]["default"] is not None
408 ):
409 return None
410 return CLI_ARGUMENTS[name]["default"]
411
412 if kwargs[name] is not CLI_ARGUMENTS[name]["default"]:
413 if not isinstance(kwargs[name], CLI_ARGUMENTS[name]["type"]):
414 raise click.BadParameter(
415 f"""Error: \
416 Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` \
417 but has value {kwargs[name]} of type {kwargs[name].__class__} instead."""
418 )
419 return kwargs[name]
420
421 return CLI_ARGUMENTS[name]["default"]
422
423
424def __main_single_rule(
425 out: str, single_rule: str, checkers: Optional[Sequence[CheckerInterface]]
426) -> None:
427 rule: Optional[Rule] = parse(single_rule)
428
429 # Verify that a rule was parsed correctly.
430 if rule is None:
431 msg = f"Error parsing rule from user input: {single_rule}"
432 _logger.critical(msg)
433 raise click.BadParameter(f"Error: {msg}")
434
435 if not is_valid_rule(rule):
436 msg = f"Error parsing rule from user input: {single_rule}"
437 _logger.critical(msg)
438 raise click.BadParameter(f"Error: {msg}")
439
440 _logger.debug("Processing rule: %s", get_rule_option(rule, "sid"))
441
442 rule_report = analyze_rule(rule, checkers=checkers)
443
444 write_output(OutputReport(rules=[rule_report]), out)
445
446
[docs]
447def process_rules_file( # noqa: C901, PLR0912, PLR0915
448 rules: str,
449 evaluate_disabled: bool,
450 checkers: Optional[Sequence[CheckerInterface]] = None,
451) -> OutputReport:
452 """Processes a rule file and returns a list of rules and their issues.
453
454 Args:
455 rules: A path to a Suricata rules file.
456 evaluate_disabled: A flag indicating whether disabled rules should be evaluated.
457 checkers: The checkers to be used when processing the rule file.
458
459 Returns:
460 A list of rules and their issues.
461
462 Raises:
463 RuntimeError: If no checkers could be automatically discovered.
464
465 """
466 if checkers is None:
467 checkers = get_checkers()
468
469 output = OutputReport()
470
471 with (
472 open(
473 os.path.normpath(rules),
474 buffering=io.DEFAULT_BUFFER_SIZE,
475 ) as rules_fh,
476 ):
477 if len(checkers) == 0:
478 msg = "No checkers provided for processing rules."
479 _logger.error(msg)
480 raise RuntimeError(msg)
481
482 _logger.info("Processing rule file: %s", rules)
483
484 collected_multiline_parts: Optional[str] = None
485 multiline_begin_number: Optional[int] = None
486
487 for number, line in enumerate(rules_fh.readlines(), start=1):
488 # First work on collecting and parsing multiline rules
489 if line.rstrip("\r\n").endswith("\\"):
490 multiline_part = line.rstrip("\r\n")[:-1]
491
492 if collected_multiline_parts is None:
493 collected_multiline_parts = multiline_part
494 multiline_begin_number = number
495 else:
496 collected_multiline_parts += multiline_part.lstrip()
497
498 continue
499
500 # Process final part of multiline rule if one is being collected
501 if collected_multiline_parts is not None:
502 collected_multiline_parts += line.lstrip()
503
504 rule_line = collected_multiline_parts.strip()
505
506 collected_multiline_parts = None
507 # If no multiline rule is being collected process as a potential single line rule
508 else:
509 if len(line.strip()) == 0:
510 continue
511
512 if line.strip().startswith("#"):
513 if evaluate_disabled:
514 # Verify that this line is a rule and not a comment
515 if parse(line) is None:
516 # Log the comment since it may be a invalid rule
517 _logger.warning(
518 "Ignoring comment on line %i: %s", number, line
519 )
520 continue
521 else:
522 # Skip the rule
523 continue
524
525 rule_line = line.strip()
526
527 try:
528 rule: Optional[Rule] = parse(rule_line)
529 except ParsingError:
530 _logger.error(
531 "Internal error in parsing of rule on line %i: %s",
532 number,
533 rule_line,
534 )
535 rule = None
536
537 # Parse comment and potential ignore comment to ignore rules
538 ignore = __parse_type_ignore(rule)
539
540 # Verify that a rule was parsed correctly.
541 if rule is None:
542 _logger.error("Error parsing rule on line %i: %s", number, rule_line)
543 continue
544
545 if not is_valid_rule(rule):
546 _logger.error("Invalid rule on line %i: %s", number, rule_line)
547 continue
548
549 _logger.debug(
550 "Processing rule: %s on line %i", get_rule_option(rule, "sid"), number
551 )
552
553 rule_report: RuleReport = analyze_rule(
554 rule,
555 checkers=checkers,
556 ignore=ignore,
557 )
558 rule_report.line_begin = multiline_begin_number or number
559 rule_report.line_end = number
560
561 output.rules.append(rule_report)
562
563 multiline_begin_number = None
564
565 _logger.info("Completed processing rule file: %s", rules)
566
567 output.summary = summarize_output(output, checkers)
568
569 return output
570
571
572def __parse_type_ignore(rule: Optional[Rule]) -> Optional[Sequence[str]]:
573 if rule is None:
574 return None
575
576 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check")
577 if ignore_value is None:
578 return []
579
580 return ignore_value.strip(' "').split(",")
581
582
[docs]
583def analyze_rule(
584 rule: Rule,
585 checkers: Optional[Sequence[CheckerInterface]] = None,
586 ignore: Optional[Sequence[str]] = None,
587) -> RuleReport:
588 """Checks a rule and returns a dictionary containing the rule and a list of issues found.
589
590 Args:
591 rule: The rule to be checked.
592 checkers: The checkers to be used to check the rule.
593 ignore: Regular expressions to match checker codes to ignore
594
595 Returns:
596 A list of issues found in the rule.
597 Each issue is typed as a `dict`.
598
599 Raises:
600 InvalidRuleError: If the rule does not follow the Suricata syntax.
601
602 """
603 if not is_valid_rule(rule):
604 raise InvalidRuleError(rule.raw)
605
606 check_rule_option_recognition(rule)
607
608 if checkers is None:
609 checkers = get_checkers()
610
611 rule_report: RuleReport = RuleReport(rule=rule)
612
613 _logger.warning(ignore)
614
615 compiled_ignore = (
616 [_regex_provider.compile(r) for r in ignore] if ignore is not None else []
617 )
618
619 for checker in checkers:
620 try:
621 issues = checker.check_rule(rule)
622 for r in compiled_ignore:
623 issues = list(filter(lambda issue: r.match(issue.code) is None, issues))
624 rule_report.add_issues(issues)
625 except Exception as exception: # noqa: BLE001
626 _logger.warning(
627 "Failed to run %s on rule: %s",
628 checker.__class__.__name__,
629 rule.raw,
630 extra={"exception": exception},
631 )
632
633 rule_report.summary = summarize_rule(rule_report, checkers)
634
635 return rule_report
636
637
638if __name__ == "__main__":
639 main()