diff --git a/src/ldaptool/_main.py b/src/ldaptool/_main.py index 60f081e..38fabb0 100644 --- a/src/ldaptool/_main.py +++ b/src/ldaptool/_main.py @@ -14,7 +14,7 @@ from ldaptool._utils import argclasses from ldaptool._utils.ldap import Result, SizeLimitExceeded -class TableOutput(enum.StrEnum): +class TableOutput(enum.Enum): MARKDOWN = "markdown" CSV = "csv" HTML = "html" @@ -42,19 +42,27 @@ class Arguments(search.Arguments): help="Markdown table output - requires list of attributes", ), ) - table_output: typing.Optional[TableOutput] = None html: bool = dataclasses.field( default=False, metadata=argclasses.arg( help="HTML table output - requires list of attributes", ), ) + table_output: typing.Optional[TableOutput] = None sort: bool = dataclasses.field( default=False, metadata=argclasses.arg( help="Sorted table output - defaults to markdown --table unless --csv is given", ), ) + json: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Use full json output"), + ) + human: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"), + ) def __post_init__(self) -> None: super(Arguments, self).__post_init__() # super() not working here, unclear why. @@ -98,7 +106,10 @@ class _Context: self.config = search.Config.load() except Exception as e: raise SystemExit(f"config error: {e}") - self.arguments = arguments_p.from_args(args) + try: + self.arguments = arguments_p.from_args(args) + except decode.InvalidStep as e: + raise SystemExit(f"invalid arguments: {e}") def run(self) -> None: # starting the search sets the base we want to print @@ -141,7 +152,7 @@ class _Context: continue # normal entry assert not isinstance(entry, list) - obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry)) + obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry)) yield tuple(obj.get(key, "") for key in column_keys) except SizeLimitExceeded as e: raise SystemExit(f"Error: {e}") @@ -201,8 +212,13 @@ class _Context: # normal entry assert not isinstance(entry, list) num_entries += 1 - obj = decoder.read(dn=dn, entry=entry) - decoder.emit(dn=dn, entry=obj) + if ldif_output: + decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream) + elif self.arguments.human: + decoder.read_and_emit_human(dn=dn, entry=entry, file=stream) + else: + assert self.arguments.json + decoder.read_and_emit_json(dn=dn, entry=entry, file=stream) except SizeLimitExceeded as e: raise SystemExit(f"Error: {e}") diff --git a/src/ldaptool/_utils/argclasses.py b/src/ldaptool/_utils/argclasses.py index 7c65fb4..14356fa 100644 --- a/src/ldaptool/_utils/argclasses.py +++ b/src/ldaptool/_utils/argclasses.py @@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments") @dataclasses.dataclass(slots=True, kw_only=True) class BaseArguments: + def __post_init__(self) -> None: + pass + @classmethod def add_fields_to_parser( cls: type[_TArgs], diff --git a/src/ldaptool/decode/__init__.py b/src/ldaptool/decode/__init__.py index 423e59a..6d98867 100644 --- a/src/ldaptool/decode/__init__.py +++ b/src/ldaptool/decode/__init__.py @@ -1,10 +1,12 @@ from __future__ import annotations from ._decoder import Attribute, Decoder +from ._postprocess import InvalidStep from .arguments import Arguments __all__ = [ "Arguments", "Attribute", "Decoder", + "InvalidStep", ] diff --git a/src/ldaptool/decode/_decoder.py b/src/ldaptool/decode/_decoder.py index 128036b..28ef42c 100644 --- a/src/ldaptool/decode/_decoder.py +++ b/src/ldaptool/decode/_decoder.py @@ -8,8 +8,6 @@ import sys import typing import uuid -from ldaptool._utils.dninfo import DNInfo - from . import _types from .arguments import Arguments @@ -122,16 +120,16 @@ class Attribute: def _base64_value(self) -> str: return base64.b64encode(self.raw).decode("ascii") - def print(self) -> None: + def print(self, *, file: typing.IO[str] = sys.stdout) -> None: if not self.decoded is None: comment = self.utf8_clean if comment is None: comment = self._base64_value - print(f"{self.name}: {self.decoded} # {comment}") + print(f"{self.name}: {self.decoded} # {comment}", file=file) elif not self.utf8_clean is None: - print(f"{self.name}: {self.utf8_clean}") + print(f"{self.name}: {self.utf8_clean}", file=file) else: - print(f"{self.name}:: {self._base64_value}") + print(f"{self.name}:: {self._base64_value}", file=file) def to_json(self) -> dict[str, typing.Any]: item: dict[str, typing.Any] = {} @@ -175,55 +173,69 @@ class Decoder: name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] for name, raw_values in entry.items() } - if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath: - dninfo = DNInfo(dn=dn) - if self.arguments.dndomain: - decoded_entry["dndomain"] = [ - Attribute.fake_attribute("dndomain", dninfo.domain), - ] - if self.arguments.dnpath: - decoded_entry["dnpath"] = [ - Attribute.fake_attribute("dnpath", dninfo.path), - ] - if self.arguments.dnfullpath: - decoded_entry["dnfullpath"] = [ - Attribute.fake_attribute("dnfullpath", dninfo.full_path), + + for attr, post_processes in self.arguments.post_process.items(): + if attr == "dn": + values = [dn] + else: + attrs = decoded_entry.get(attr, None) + if attrs is None: + continue + values = [at.human() for at in attrs] + for column, post_process in post_processes.items(): + decoded_entry[column] = [ + Attribute.fake_attribute(column, post_process.process(value)) for value in values ] + return decoded_entry - def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]: + def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]: emit: dict[str, typing.Any] = dict(dn=dn) - for name, attrs in entry.items(): + for name, attrs in obj.items(): emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) return emit - def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]: + def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: + emit = self.human(dn=dn, obj=obj) + json.dump(emit, file, ensure_ascii=False) + print(file=file) # terminate output dicts by newline + + def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: + self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) + + def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]: emit: dict[str, typing.Any] = dict(dn=dn) - for name, attrs in entry.items(): + for name, attrs in obj.items(): emit[name] = [attr.to_json() for attr in attrs] return emit - def _emit_json(self, *, dn: str, entry: TDecoded) -> None: - if self.arguments.human: - emit = self.human(dn=dn, entry=entry) + def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: + emit = self.json(dn=dn, obj=obj) + json.dump(emit, file, ensure_ascii=False) + print(file=file) # terminate output dicts by newline + + def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: + self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) + + def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: + print(f"dn: {dn}", file=file) + attrs: typing.Optional[list[Attribute]] + if not self.arguments.attributes: + # show all attributes - use order from server + for attrs in obj.values(): + for attr in attrs: + attr.print(file=file) else: - emit = self.json(dn=dn, entry=entry) - json.dump(emit, sys.stdout, ensure_ascii=False) - print() # terminate output dicts by newline + # only selected columns; use given order + for column in self.arguments.columns_keys: + if column == "dn": + continue # already printed dn + attrs = obj.get(column, None) + if attrs is None: + continue + for attr in attrs: + attr.print(file=file) + print(file=file) # separate entries with newlines - def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None: - print(f"dn: {dn}") - for attrs in entry.values(): - for attr in attrs: - attr.print() - print() # separate entries with newlines - - def emit(self, *, dn: str, entry: TDecoded) -> None: - if self.arguments.human or self.arguments.json: - self._emit_json(dn=dn, entry=entry) - else: - self._emit_ldif(dn=dn, entry=entry) - - def handle(self, *, dn: str, entry: TEntry) -> None: - entry_attrs = self.read(dn=dn, entry=entry) - self.emit(dn=dn, entry=entry_attrs) + def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: + self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) diff --git a/src/ldaptool/decode/_postprocess.py b/src/ldaptool/decode/_postprocess.py new file mode 100644 index 0000000..d7529e6 --- /dev/null +++ b/src/ldaptool/decode/_postprocess.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import abc +import dataclasses + +from ldaptool._utils.dninfo import DNInfo + + +class Step(abc.ABC): + __slots__ = () + + @abc.abstractmethod + def step(self, value: str) -> str: + ... + + +@dataclasses.dataclass(slots=True) +class MaxLength(Step): + limit: int + + def step(self, value: str) -> str: + if not self.limit or len(value) <= self.limit: + return value + return value[: self.limit - 1] + "…" + + +@dataclasses.dataclass(slots=True) +class DNDomain(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.domain + + +@dataclasses.dataclass(slots=True) +class DNPath(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.path + + +@dataclasses.dataclass(slots=True) +class DNFullPath(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.full_path + + +_STEPS = { + "domain": DNDomain(), + "path": DNPath(), + "fullpath": DNFullPath(), +} + + +class InvalidStep(Exception): + pass + + +@dataclasses.dataclass(slots=True) +class PostProcess: + steps: list[Step] + + def process(self, value: str) -> str: + for step in self.steps: + value = step.step(value) + return value + + +def parse_steps(steps: list[str]) -> PostProcess: + max_len = 0 + try: + max_len = int(steps[-1]) + steps.pop() + except ValueError: + pass + result = [] + for step in steps: + step_i = _STEPS.get(step, None) + if step_i is None: + raise InvalidStep(f"Unknown post-processing step {step!r}") + result.append(step_i) + if max_len: + result.append(MaxLength(max_len)) + return PostProcess(result) diff --git a/src/ldaptool/decode/arguments.py b/src/ldaptool/decode/arguments.py index f1e44aa..90e324e 100644 --- a/src/ldaptool/decode/arguments.py +++ b/src/ldaptool/decode/arguments.py @@ -1,47 +1,78 @@ from __future__ import annotations +import argparse import dataclasses from ldaptool._utils import argclasses +from . import _postprocess + + +def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: + parser.add_argument( + metavar="attributes", + dest=dest, + nargs="*", + help=""" + Attributes to lookup (and columns to display in tables). + Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn). + """, + ) + @dataclasses.dataclass(slots=True, kw_only=True) class Arguments(argclasses.BaseArguments): - json: bool = dataclasses.field( - default=False, - metadata=argclasses.arg(help="Use full json output"), - ) - human: bool = dataclasses.field( - default=False, - metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"), - ) + columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) + columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names + attributes: list[str] = dataclasses.field(default_factory=list) + human_separator: str = dataclasses.field( default=", ", metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), ) + dateonly: bool = dataclasses.field( default=True, metadata=argclasses.arg(help="Use only date part of decoded timestamps"), ) - dndomain: bool = dataclasses.field( - default=False, - metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"), - ) - dnpath: bool = dataclasses.field( - default=False, - metadata=argclasses.arg( - help=""" - Whether to export a virtual dnpath attribute - ('/' joined values of reversed DN without DNS labels) - """ - ), - ) - dnfullpath: bool = dataclasses.field( - default=False, - metadata=argclasses.arg( - help=""" - Whether to export a virtual dnfullpath attribute - ('/' joined values of reversed DN; DNS domain as first label) - """ - ), - ) + + post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict) + + def __post_init__(self) -> None: + super(Arguments, self).__post_init__() # super() not working here, unclear why. + + # extract special attribute names + all_attributes = False + attributes_set: set[str] = set() + self.columns_keys = [] + for column in list(self.columns): + column = column.lower() + + if column == "*": + # '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing + self.columns.remove("*") + all_attributes = True + continue + + self.columns_keys.append(column) + + if column == "dndomain": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"]) + attributes_set.add("dn") + elif column == "dnpath": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"]) + attributes_set.add("dn") + elif column == "dnfullpath": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"]) + attributes_set.add("dn") + else: + step_names = column.split(":") + attributes_set.add(step_names[0]) + if len(step_names) > 1: + source = step_names.pop(0) + self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names) + + if all_attributes: + self.attributes = [] + else: + self.attributes = list(attributes_set) diff --git a/src/ldaptool/search/arguments.py b/src/ldaptool/search/arguments.py index 1eef170..8dc3ffb 100644 --- a/src/ldaptool/search/arguments.py +++ b/src/ldaptool/search/arguments.py @@ -1,6 +1,5 @@ from __future__ import annotations -import argparse import dataclasses import typing @@ -8,28 +7,8 @@ import ldaptool.decode.arguments from ldaptool._utils import argclasses -def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: - parser.add_argument( - metavar="attributes", - dest=dest, - nargs="*", - help=""" - Attributes to lookup (and columns to display in tables). - Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn). - """, - ) - - @dataclasses.dataclass(slots=True, kw_only=True) class Arguments(ldaptool.decode.arguments.Arguments): - # overwrite fields for fake attributes to remove them from argparse; - # we enable those based on the attribute list - dndomain: bool = False - dnpath: bool = False - dnfullpath: bool = False - - attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) - columns: list[str] = dataclasses.field(default_factory=list) filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter")) find: typing.Optional[str] = dataclasses.field( default=None, @@ -75,6 +54,8 @@ class Arguments(ldaptool.decode.arguments.Arguments): ) def __post_init__(self) -> None: + super(Arguments, self).__post_init__() # super() not working here, unclear why. + if not self.filter is None: if not self.find is None: raise SystemExit("Can't use both --find and --filter") @@ -86,19 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments): else: # probably doesn't like empty filter? self.filter = "(objectClass=*)" - - # extract special attribute names - self.columns = self.attributes # use all names for columns (headings and their order) - attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name - # create fake attributes on demand - if attributes_set.pop("dndomain", ""): - self.dndomain = True - if attributes_set.pop("dnpath", ""): - self.dnpath = True - if attributes_set.pop("dnfullpath", ""): - self.dnfullpath = True - # store remaining attributes (with original case) - self.attributes = list(attributes_set.values()) - if self.columns and not self.attributes: - # if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes - self.attributes = ["dn"]