1"""The `suricata_check._suricata_check` module contains the command line utility and the main program logic."""
2
3import atexit
4import configparser
5import io
6import json
7import logging
8import logging.handlers
9import multiprocessing
10import os
11import sys
12import threading
13from collections.abc import Sequence
14from typing import (
15 Any,
16 Callable,
17 Literal,
18 Optional,
19 TypeVar,
20 Union,
21)
22
23import click
24
25_AnyCallable = Callable[..., Any]
26_FC = TypeVar("_FC", bound=Union[_AnyCallable, click.Command])
27
28LOG_LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR")
29LogLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]
30
31# Define all command line arguments and their properties
32CLI_ARGUMENTS: dict[str, dict[str, Any]] = {
33 "rules": {
34 "help": "Path to Suricata rules to provide check on.",
35 "show_default": True,
36 "type": str,
37 "required": False,
38 "default": ".",
39 "cli_options": ["-r"],
40 },
41 "single_rule": {
42 "help": "A single Suricata rule to be checked",
43 "show_default": False,
44 "type": str,
45 "required": False,
46 "default": None,
47 "cli_options": ["-s"],
48 },
49 "out": {
50 "help": "Path to suricata-check output folder.",
51 "show_default": True,
52 "type": str,
53 "required": False,
54 "default": ".",
55 "cli_options": ["-o"],
56 },
57 "log_level": {
58 "help": f"Verbosity level for logging. Can be one of {LOG_LEVELS}",
59 "show_default": True,
60 "type": str,
61 "required": False,
62 "default": "INFO",
63 },
64 "gitlab": {
65 "help": "Flag to create CodeClimate output report for GitLab CI/CD.",
66 "show_default": True,
67 "type": bool,
68 "required": False,
69 "default": False,
70 "is_flag": True,
71 },
72 "github": {
73 "help": "Flag to write workflow commands to stdout for GitHub CI/CD.",
74 "show_default": True,
75 "type": bool,
76 "required": False,
77 "default": False,
78 "is_flag": True,
79 },
80 "evaluate_disabled": {
81 "help": "Flag to evaluate disabled rules.",
82 "show_default": True,
83 "type": bool,
84 "required": False,
85 "default": False,
86 "is_flag": True,
87 },
88 "issue_severity": {
89 "help": f"Verbosity level for detected issues. Can be one of {LOG_LEVELS}",
90 "show_default": True,
91 "type": str,
92 "required": False,
93 "default": "INFO",
94 },
95 "include_all": {
96 "help": "Flag to indicate all checker codes should be enabled.",
97 "show_default": True,
98 "type": bool,
99 "required": False,
100 "default": False,
101 "is_flag": True,
102 "cli_options": ["-a"],
103 },
104 "include": {
105 "help": "List of all checker codes to enable. Regexes can be provided.",
106 "show_default": True,
107 "type": tuple,
108 "required": False,
109 "default": (),
110 "multiple": True,
111 "cli_options": ["-i"],
112 },
113 "exclude": {
114 "help": "List of all checker codes to disable. Regexes can be provided.",
115 "show_default": True,
116 "type": tuple,
117 "required": False,
118 "default": (),
119 "multiple": True,
120 "cli_options": ["-e"],
121 },
122}
123CLI_ARGUMENT_TYPE = Optional[Union[str, bool, tuple]]
124
125# Add suricata-check to the front of the PATH, such that the version corresponding to the CLI is used.
126_suricata_check_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
127if sys.path[0] != _suricata_check_path:
128 sys.path.insert(0, _suricata_check_path)
129
130from suricata_check._checkers import get_checkers # noqa: E402
131from suricata_check._output import ( # noqa: E402
132 summarize_output,
133 summarize_rule,
134 write_output,
135)
136from suricata_check._version import ( # noqa: E402
137 __version__,
138 check_for_update,
139 get_dependency_versions,
140)
141from suricata_check.checkers.interface import CheckerInterface # noqa: E402
142from suricata_check.utils._click import ClickHandler, help_option # noqa: E402
143from suricata_check.utils._path import find_rules_file # noqa: E402
144from suricata_check.utils.checker import ( # noqa: E402
145 check_rule_option_recognition,
146 get_rule_option,
147 get_rule_suboption,
148)
149from suricata_check.utils.checker_typing import ( # noqa: E402
150 InvalidRuleError,
151 OutputReport,
152 RuleReport,
153)
154from suricata_check.utils.regex import is_valid_rule # noqa: E402
155from suricata_check.utils.regex_provider import get_regex_provider # noqa: E402
156from suricata_check.utils.rule import ParsingError, Rule, parse # noqa: E402
157
158_logger = logging.getLogger(__name__)
159
160_regex_provider = get_regex_provider()
161
162
163def __create_click_option(name: str, props: dict[str, Any]) -> Callable[[_FC], _FC]:
164 """Create a click.option decorator from argument properties."""
165 kwargs = {
166 "help": props["help"],
167 "show_default": props["show_default"],
168 "default": props.get("default"),
169 "is_flag": props.get("is_flag", False),
170 "multiple": props.get("multiple", False),
171 }
172
173 # Add any additional CLI options (like -r, -s, etc.)
174 cli_opts = props.get("cli_options", [])
175 args = [f"--{name.replace('_', '-')}", *cli_opts]
176
177 return click.option(*args, **kwargs)
178
179
180def __main_decorators() -> Callable[[Callable], click.Command]:
181 """Create the CLI command with all options."""
182
183 def decorator(f: Callable) -> click.Command:
184 # Apply all options in reverse order (bottom to top)
185 command = click.command()(f)
186 command = help_option("-h", "--help")(command)
187
188 # Add ini option first since it's needed before processing other options
189 command = click.option(
190 "--ini",
191 help="Path to suricata-check.ini file to read configuration from.",
192 show_default=True,
193 type=str,
194 default=None,
195 )(command)
196
197 # Apply options from CLI_ARGUMENTS
198 for name, props in CLI_ARGUMENTS.items():
199 command = __create_click_option(name, props)(command)
200
201 return command
202
203 return decorator
204
205
206@__main_decorators()
207def main(**kwargs: dict[str, Any]) -> None: # noqa: C901, PLR0915
208 """The `suricata-check` command processes all rules inside a rules file and outputs a list of detected issues.
209
210 Check the CLI usage documentation for a full overview of how to use the CLI: https://suricata-check.teuwen.net/cli_usage.html
211 """
212 # Look for a ini file and parse it.
213 ini_kwargs = get_ini_kwargs(
214 str(kwargs["ini"]) if kwargs["ini"] is not None else None, # type: ignore reportUnnecessaryComparison
215 )
216
217 # Verify CLI argument types and get CLI arguments or use default arguments
218 rules: str = __get_verified_kwarg(
219 [kwargs, ini_kwargs],
220 "rules",
221 ) # pyright: ignore[reportAssignmentType]
222 single_rule: Optional[str] = __get_verified_kwarg(
223 [kwargs, ini_kwargs],
224 "single_rule",
225 ) # pyright: ignore[reportAssignmentType]
226 out: str = __get_verified_kwarg(
227 [kwargs, ini_kwargs],
228 "out",
229 ) # pyright: ignore[reportAssignmentType]
230 log_level: LogLevel = __get_verified_kwarg(
231 [kwargs, ini_kwargs],
232 "log_level",
233 ) # pyright: ignore[reportAssignmentType]
234 gitlab: bool = __get_verified_kwarg(
235 [kwargs, ini_kwargs],
236 "gitlab",
237 ) # pyright: ignore[reportAssignmentType]
238 github: bool = __get_verified_kwarg(
239 [kwargs, ini_kwargs],
240 "github",
241 ) # pyright: ignore[reportAssignmentType]
242 evaluate_disabled: bool = __get_verified_kwarg(
243 [kwargs, ini_kwargs],
244 "evaluate_disabled",
245 ) # pyright: ignore[reportAssignmentType]
246 issue_severity: LogLevel = __get_verified_kwarg(
247 [kwargs, ini_kwargs],
248 "issue_severity",
249 ) # pyright: ignore[reportAssignmentType]
250 include_all: bool = __get_verified_kwarg(
251 [kwargs, ini_kwargs],
252 "include_all",
253 ) # pyright: ignore[reportAssignmentType]
254 include: tuple[str, ...] = __get_verified_kwarg(
255 [kwargs, ini_kwargs],
256 "include",
257 ) # pyright: ignore[reportAssignmentType]
258 exclude: tuple[str, ...] = __get_verified_kwarg(
259 [kwargs, ini_kwargs],
260 "exclude",
261 ) # pyright: ignore[reportAssignmentType]
262
263 # Verify that out argument is valid
264 if os.path.exists(out) and not os.path.isdir(out):
265 raise click.BadParameter(f"Error: {out} is not a directory.")
266
267 # Verify that log_level argument is valid
268 if log_level not in LOG_LEVELS:
269 raise click.BadParameter(f"Error: {log_level} is not a valid log level.")
270
271 # Create out directory if non-existent
272 if not os.path.exists(out):
273 os.makedirs(out)
274
275 # Setup logging from a seperate thread
276 queue = multiprocessing.Manager().Queue()
277 queue_handler = logging.handlers.QueueHandler(queue)
278
279 click_handler = ClickHandler(
280 github=github,
281 github_level=getattr(logging, log_level),
282 )
283 logging.basicConfig(
284 level=log_level,
285 format="%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s",
286 handlers=(queue_handler, click_handler),
287 force=os.environ.get("SURICATA_CHECK_FORCE_LOGGING", "FALSE") == "TRUE",
288 )
289
290 file_handler = logging.FileHandler(
291 filename=os.path.join(out, "suricata-check.log"),
292 delay=True,
293 )
294 queue_listener = logging.handlers.QueueListener(
295 queue,
296 file_handler,
297 respect_handler_level=True,
298 )
299
300 def _at_exit() -> None:
301 """Cleans up logging listener and handlers before exiting."""
302 queue_listener.enqueue_sentinel()
303 queue_listener.stop()
304 file_handler.flush()
305 file_handler.close()
306 atexit.unregister(_at_exit)
307
308 atexit.register(_at_exit)
309
310 queue_listener.start()
311
312 # Log the arguments:
313 _logger.info("Running suricata-check with the following arguments:")
314 for arg in CLI_ARGUMENTS:
315 _logger.debug("%s: %s", arg, locals().get(arg))
316
317 # Log the environment:
318 _logger.debug("Platform: %s", sys.platform)
319 _logger.debug("Python version: %s", sys.version)
320 _logger.debug("suricata-check path: %s", _suricata_check_path)
321 _logger.debug("suricata-check version: %s", __version__)
322 for package, version in get_dependency_versions().items():
323 _logger.debug("Dependency %s version: %s", package, version)
324
325 threading.Thread(
326 target=check_for_update,
327 ).start()
328
329 # Verify that include and exclude arguments are valid
330 if include_all and len(include) > 0:
331 raise click.BadParameter(
332 "Error: Cannot use --include-all and --include together.",
333 )
334 if include_all:
335 include = (".*",)
336
337 # Verify that issue_severity argument is valid
338 if issue_severity not in LOG_LEVELS:
339 raise click.BadParameter(
340 f"Error: {issue_severity} is not a valid issue severity or log level.",
341 )
342
343 checkers = get_checkers(
344 include,
345 exclude,
346 issue_severity=getattr(logging, issue_severity),
347 )
348
349 if single_rule is not None:
350 __main_single_rule(out, single_rule, checkers)
351
352 # Return here so no rules file is processed.
353 _at_exit()
354 return
355
356 # Check if the rules argument is valid and find the rules file
357 rules = find_rules_file(rules)
358
359 output = process_rules_file(rules, evaluate_disabled, checkers=checkers)
360
361 write_output(output, out, gitlab=gitlab, github=github, rules_file=rules)
362
363 _at_exit()
364
365
[docs]
366def get_ini_kwargs(path: Optional[str]) -> dict[str, Any]:
367 """Read configuration from INI file based on CLI_ARGUMENTS structure."""
368 ini_kwargs: dict[str, Any] = {}
369 if path is not None and not os.path.exists(path):
370 raise click.BadParameter(
371 f"Error: INI file provided in {path} but no options loaded",
372 )
373
374 # Use the default path if no path was provided
375 if path is None:
376 path = "suricata-check.ini"
377 if not os.path.exists(path):
378 return {}
379
380 config_parser = configparser.ConfigParser(
381 empty_lines_in_values=False,
382 default_section="suricata-check",
383 converters={"tuple": lambda x: tuple(json.loads(x))},
384 )
385 config_parser.read(path)
386
387 # Process each argument defined in CLI_ARGUMENTS
388 for arg_name, arg_props in CLI_ARGUMENTS.items():
389 ini_key = arg_name.replace("_", "-")
390 if not config_parser.has_option("suricata-check", ini_key):
391 continue
392
393 # Get the value based on the argument type
394 if arg_props["type"] is bool:
395 ini_kwargs[arg_name] = config_parser.getboolean("suricata-check", ini_key)
396 elif arg_props["type"] is tuple:
397 ini_kwargs[arg_name] = config_parser.gettuple("suricata-check", ini_key) # type: ignore reportAttributeAccessIssue
398 else:
399 ini_kwargs[arg_name] = config_parser.get("suricata-check", ini_key)
400 if arg_props["type"] is str:
401 ini_kwargs[arg_name] = ini_kwargs[arg_name].strip('"')
402
403 return ini_kwargs
404
405
406def __get_verified_kwarg(
407 kwargss: Sequence[dict[str, Any]],
408 name: str,
409) -> CLI_ARGUMENT_TYPE:
410 for kwargs in kwargss:
411 if name in kwargs:
412 if kwargs[name] is None:
413 if (
414 not CLI_ARGUMENTS[name]["required"]
415 and CLI_ARGUMENTS[name]["default"] is not None
416 ):
417 return None
418 return CLI_ARGUMENTS[name]["default"]
419
420 if kwargs[name] is not CLI_ARGUMENTS[name]["default"]:
421 if not isinstance(kwargs[name], CLI_ARGUMENTS[name]["type"]):
422 raise click.BadParameter(
423 f"""Error: \
424 Argument `{name}` should have a value of type `{CLI_ARGUMENTS[name]["type"]}` \
425 but has value {kwargs[name]} of type {kwargs[name].__class__} instead.""",
426 )
427 return kwargs[name]
428
429 return CLI_ARGUMENTS[name]["default"]
430
431
432def __main_single_rule(
433 out: str,
434 single_rule: str,
435 checkers: Optional[Sequence[CheckerInterface]],
436) -> None:
437 rule: Optional[Rule] = parse(single_rule)
438
439 # Verify that a rule was parsed correctly.
440 if rule is None:
441 msg = f"Error parsing rule from user input: {single_rule}"
442 _logger.critical(msg)
443 raise click.BadParameter(f"Error: {msg}")
444
445 if not is_valid_rule(rule):
446 msg = f"Error parsing rule from user input: {single_rule}"
447 _logger.critical(msg)
448 raise click.BadParameter(f"Error: {msg}")
449
450 _logger.debug("Processing rule: %s", get_rule_option(rule, "sid"))
451
452 rule_report = analyze_rule(rule, checkers=checkers)
453
454 write_output(OutputReport(rules=[rule_report]), out)
455
456
[docs]
457def process_rules_file( # noqa: C901, PLR0912, PLR0915
458 rules: str,
459 evaluate_disabled: bool,
460 checkers: Optional[Sequence[CheckerInterface]] = None,
461) -> OutputReport:
462 """Processes a rule file and returns a list of rules and their issues.
463
464 Args:
465 rules: A path to a Suricata rules file.
466 evaluate_disabled: A flag indicating whether disabled rules should be evaluated.
467 checkers: The checkers to be used when processing the rule file.
468
469 Returns:
470 A list of rules and their issues.
471
472 Raises:
473 RuntimeError: If no checkers could be automatically discovered.
474
475 """
476 if checkers is None:
477 checkers = get_checkers()
478
479 output = OutputReport()
480
481 with (
482 open(
483 os.path.normpath(rules),
484 buffering=io.DEFAULT_BUFFER_SIZE,
485 ) as rules_fh,
486 ):
487 if len(checkers) == 0:
488 msg = "No checkers provided for processing rules."
489 _logger.error(msg)
490 raise RuntimeError(msg)
491
492 _logger.info("Processing rule file: %s", rules)
493
494 collected_multiline_parts: Optional[str] = None
495 multiline_begin_number: Optional[int] = None
496
497 for number, line in enumerate(rules_fh.readlines(), start=1):
498 # First work on collecting and parsing multiline rules
499 if line.rstrip("\r\n").endswith("\\"):
500 multiline_part = line.rstrip("\r\n")[:-1]
501
502 if collected_multiline_parts is None:
503 collected_multiline_parts = multiline_part
504 multiline_begin_number = number
505 else:
506 collected_multiline_parts += multiline_part.lstrip()
507
508 continue
509
510 # Process final part of multiline rule if one is being collected
511 if collected_multiline_parts is not None:
512 collected_multiline_parts += line.lstrip()
513
514 rule_line = collected_multiline_parts.strip()
515
516 collected_multiline_parts = None
517 # If no multiline rule is being collected process as a potential single line rule
518 else:
519 if len(line.strip()) == 0:
520 continue
521
522 if line.strip().startswith("#"):
523 if evaluate_disabled:
524 # Verify that this line is a rule and not a comment
525 if parse(line) is None:
526 # Log the comment since it may be a invalid rule
527 _logger.warning(
528 "Ignoring comment on line %i: %s",
529 number,
530 line,
531 )
532 continue
533 else:
534 # Skip the rule
535 continue
536
537 rule_line = line.strip()
538
539 try:
540 rule: Optional[Rule] = parse(rule_line)
541 except ParsingError:
542 _logger.error(
543 "Internal error in parsing of rule on line %i: %s",
544 number,
545 rule_line,
546 )
547 rule = None
548
549 # Parse comment and potential ignore comment to ignore rules
550 ignore = __parse_type_ignore(rule)
551
552 # Verify that a rule was parsed correctly.
553 if rule is None:
554 _logger.error("Error parsing rule on line %i: %s", number, rule_line)
555 continue
556
557 if not is_valid_rule(rule):
558 _logger.error("Invalid rule on line %i: %s", number, rule_line)
559 continue
560
561 _logger.debug(
562 "Processing rule: %s on line %i",
563 get_rule_option(rule, "sid"),
564 number,
565 )
566
567 rule_report: RuleReport = analyze_rule(
568 rule,
569 checkers=checkers,
570 ignore=ignore,
571 )
572 rule_report.line_begin = multiline_begin_number or number
573 rule_report.line_end = number
574
575 output.rules.append(rule_report)
576
577 multiline_begin_number = None
578
579 _logger.info("Completed processing rule file: %s", rules)
580
581 output.summary = summarize_output(output, checkers)
582
583 return output
584
585
586def __parse_type_ignore(rule: Optional[Rule]) -> Optional[Sequence[str]]:
587 if rule is None:
588 return None
589
590 ignore_value = get_rule_suboption(rule, "metadata", "suricata-check")
591 if ignore_value is None:
592 return []
593
594 return ignore_value.strip(' "').split(",")
595
596
[docs]
597def analyze_rule(
598 rule: Rule,
599 checkers: Optional[Sequence[CheckerInterface]] = None,
600 ignore: Optional[Sequence[str]] = None,
601) -> RuleReport:
602 """Checks a rule and returns a dictionary containing the rule and a list of issues found.
603
604 Args:
605 rule: The rule to be checked.
606 checkers: The checkers to be used to check the rule.
607 ignore: Regular expressions to match checker codes to ignore
608
609 Returns:
610 A list of issues found in the rule.
611 Each issue is typed as a `dict`.
612
613 Raises:
614 InvalidRuleError: If the rule does not follow the Suricata syntax.
615
616 """
617 if not is_valid_rule(rule):
618 raise InvalidRuleError(rule.raw)
619
620 check_rule_option_recognition(rule)
621
622 if checkers is None:
623 checkers = get_checkers()
624
625 rule_report: RuleReport = RuleReport(rule=rule)
626
627 compiled_ignore = (
628 [_regex_provider.compile(r) for r in ignore] if ignore is not None else []
629 )
630
631 for checker in checkers:
632 try:
633 issues = checker.check_rule(rule)
634 for r in compiled_ignore:
635 issues = list(filter(lambda issue: r.match(issue.code) is None, issues))
636 rule_report.add_issues(issues)
637 except Exception as exception: # noqa: BLE001
638 _logger.warning(
639 "Failed to run %s on rule: %s",
640 checker.__class__.__name__,
641 rule.raw,
642 extra={"exception": exception},
643 )
644
645 rule_report.summary = summarize_rule(rule_report, checkers)
646
647 return rule_report
648
649
650if __name__ == "__main__":
651 main()