diff --git a/src/ldaptool/_main.py b/src/ldaptool/_main.py index 4b396c0..b5c6dad 100644 --- a/src/ldaptool/_main.py +++ b/src/ldaptool/_main.py @@ -106,7 +106,10 @@ class _Context: self.config = search.Config.load() except Exception as e: raise SystemExit(f"config error: {e}") - self.arguments = arguments_p.from_args(args) + try: + self.arguments = arguments_p.from_args(args) + except decode.InvalidStep as e: + raise SystemExit(f"invalid arguments: {e}") def run(self) -> None: # starting the search sets the base we want to print diff --git a/src/ldaptool/decode/__init__.py b/src/ldaptool/decode/__init__.py index 423e59a..6d98867 100644 --- a/src/ldaptool/decode/__init__.py +++ b/src/ldaptool/decode/__init__.py @@ -1,10 +1,12 @@ from __future__ import annotations from ._decoder import Attribute, Decoder +from ._postprocess import InvalidStep from .arguments import Arguments __all__ = [ "Arguments", "Attribute", "Decoder", + "InvalidStep", ] diff --git a/src/ldaptool/decode/_decoder.py b/src/ldaptool/decode/_decoder.py index 031ccf8..28ef42c 100644 --- a/src/ldaptool/decode/_decoder.py +++ b/src/ldaptool/decode/_decoder.py @@ -8,8 +8,6 @@ import sys import typing import uuid -from ldaptool._utils.dninfo import DNInfo - from . import _types from .arguments import Arguments @@ -175,20 +173,20 @@ class Decoder: name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] for name, raw_values in entry.items() } - if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath: - dninfo = DNInfo(dn=dn) - if self.arguments.dndomain: - decoded_entry["dndomain"] = [ - Attribute.fake_attribute("dndomain", dninfo.domain), - ] - if self.arguments.dnpath: - decoded_entry["dnpath"] = [ - Attribute.fake_attribute("dnpath", dninfo.path), - ] - if self.arguments.dnfullpath: - decoded_entry["dnfullpath"] = [ - Attribute.fake_attribute("dnfullpath", dninfo.full_path), + + for attr, post_processes in self.arguments.post_process.items(): + if attr == "dn": + values = [dn] + else: + attrs = decoded_entry.get(attr, None) + if attrs is None: + continue + values = [at.human() for at in attrs] + for column, post_process in post_processes.items(): + decoded_entry[column] = [ + Attribute.fake_attribute(column, post_process.process(value)) for value in values ] + return decoded_entry def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]: @@ -221,9 +219,22 @@ class Decoder: def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: print(f"dn: {dn}", file=file) - for attrs in obj.values(): - for attr in attrs: - attr.print(file=file) + attrs: typing.Optional[list[Attribute]] + if not self.arguments.attributes: + # show all attributes - use order from server + for attrs in obj.values(): + for attr in attrs: + attr.print(file=file) + else: + # only selected columns; use given order + for column in self.arguments.columns_keys: + if column == "dn": + continue # already printed dn + attrs = obj.get(column, None) + if attrs is None: + continue + for attr in attrs: + attr.print(file=file) print(file=file) # separate entries with newlines def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: diff --git a/src/ldaptool/decode/_postprocess.py b/src/ldaptool/decode/_postprocess.py new file mode 100644 index 0000000..d7529e6 --- /dev/null +++ b/src/ldaptool/decode/_postprocess.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import abc +import dataclasses + +from ldaptool._utils.dninfo import DNInfo + + +class Step(abc.ABC): + __slots__ = () + + @abc.abstractmethod + def step(self, value: str) -> str: + ... + + +@dataclasses.dataclass(slots=True) +class MaxLength(Step): + limit: int + + def step(self, value: str) -> str: + if not self.limit or len(value) <= self.limit: + return value + return value[: self.limit - 1] + "…" + + +@dataclasses.dataclass(slots=True) +class DNDomain(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.domain + + +@dataclasses.dataclass(slots=True) +class DNPath(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.path + + +@dataclasses.dataclass(slots=True) +class DNFullPath(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.full_path + + +_STEPS = { + "domain": DNDomain(), + "path": DNPath(), + "fullpath": DNFullPath(), +} + + +class InvalidStep(Exception): + pass + + +@dataclasses.dataclass(slots=True) +class PostProcess: + steps: list[Step] + + def process(self, value: str) -> str: + for step in self.steps: + value = step.step(value) + return value + + +def parse_steps(steps: list[str]) -> PostProcess: + max_len = 0 + try: + max_len = int(steps[-1]) + steps.pop() + except ValueError: + pass + result = [] + for step in steps: + step_i = _STEPS.get(step, None) + if step_i is None: + raise InvalidStep(f"Unknown post-processing step {step!r}") + result.append(step_i) + if max_len: + result.append(MaxLength(max_len)) + return PostProcess(result) diff --git a/src/ldaptool/decode/arguments.py b/src/ldaptool/decode/arguments.py index 524c691..90e324e 100644 --- a/src/ldaptool/decode/arguments.py +++ b/src/ldaptool/decode/arguments.py @@ -5,6 +5,8 @@ import dataclasses from ldaptool._utils import argclasses +from . import _postprocess + def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: parser.add_argument( @@ -21,6 +23,7 @@ def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: @dataclasses.dataclass(slots=True, kw_only=True) class Arguments(argclasses.BaseArguments): columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) + columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names attributes: list[str] = dataclasses.field(default_factory=list) human_separator: str = dataclasses.field( @@ -33,24 +36,43 @@ class Arguments(argclasses.BaseArguments): metadata=argclasses.arg(help="Use only date part of decoded timestamps"), ) - dndomain: bool = False - dnpath: bool = False - dnfullpath: bool = False + post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict) def __post_init__(self) -> None: super(Arguments, self).__post_init__() # super() not working here, unclear why. # extract special attribute names - attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.columns} # index by lowercase name - # create fake attributes on demand - if attributes_set.pop("dndomain", ""): - self.dndomain = True - if attributes_set.pop("dnpath", ""): - self.dnpath = True - if attributes_set.pop("dnfullpath", ""): - self.dnfullpath = True - # store remaining attributes (with original case) - self.attributes = list(attributes_set.values()) - if self.columns and not self.attributes: - # if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes - self.attributes = ["dn"] + all_attributes = False + attributes_set: set[str] = set() + self.columns_keys = [] + for column in list(self.columns): + column = column.lower() + + if column == "*": + # '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing + self.columns.remove("*") + all_attributes = True + continue + + self.columns_keys.append(column) + + if column == "dndomain": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"]) + attributes_set.add("dn") + elif column == "dnpath": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"]) + attributes_set.add("dn") + elif column == "dnfullpath": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"]) + attributes_set.add("dn") + else: + step_names = column.split(":") + attributes_set.add(step_names[0]) + if len(step_names) > 1: + source = step_names.pop(0) + self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names) + + if all_attributes: + self.attributes = [] + else: + self.attributes = list(attributes_set)