From c03374d6df49b7f7a1f2d77c03f79761912c2f1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Fri, 28 Apr 2023 19:44:13 +0200 Subject: [PATCH 1/4] move argument/column handling to decoder (prepare for more post-processing in decoder) --- src/ldaptool/_utils/argclasses.py | 3 ++ src/ldaptool/decode/arguments.py | 61 ++++++++++++++++++++----------- src/ldaptool/search/arguments.py | 39 +------------------- 3 files changed, 44 insertions(+), 59 deletions(-) diff --git a/src/ldaptool/_utils/argclasses.py b/src/ldaptool/_utils/argclasses.py index 7c65fb4..14356fa 100644 --- a/src/ldaptool/_utils/argclasses.py +++ b/src/ldaptool/_utils/argclasses.py @@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments") @dataclasses.dataclass(slots=True, kw_only=True) class BaseArguments: + def __post_init__(self) -> None: + pass + @classmethod def add_fields_to_parser( cls: type[_TArgs], diff --git a/src/ldaptool/decode/arguments.py b/src/ldaptool/decode/arguments.py index f1e44aa..c4ea92e 100644 --- a/src/ldaptool/decode/arguments.py +++ b/src/ldaptool/decode/arguments.py @@ -1,12 +1,28 @@ from __future__ import annotations +import argparse import dataclasses from ldaptool._utils import argclasses +def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: + parser.add_argument( + metavar="attributes", + dest=dest, + nargs="*", + help=""" + Attributes to lookup (and columns to display in tables). + Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn). + """, + ) + + @dataclasses.dataclass(slots=True, kw_only=True) class Arguments(argclasses.BaseArguments): + columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) + attributes: list[str] = dataclasses.field(default_factory=list) + json: bool = dataclasses.field( default=False, metadata=argclasses.arg(help="Use full json output"), @@ -19,29 +35,30 @@ class Arguments(argclasses.BaseArguments): default=", ", metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), ) + dateonly: bool = dataclasses.field( default=True, metadata=argclasses.arg(help="Use only date part of decoded timestamps"), ) - dndomain: bool = dataclasses.field( - default=False, - metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"), - ) - dnpath: bool = dataclasses.field( - default=False, - metadata=argclasses.arg( - help=""" - Whether to export a virtual dnpath attribute - ('/' joined values of reversed DN without DNS labels) - """ - ), - ) - dnfullpath: bool = dataclasses.field( - default=False, - metadata=argclasses.arg( - help=""" - Whether to export a virtual dnfullpath attribute - ('/' joined values of reversed DN; DNS domain as first label) - """ - ), - ) + + dndomain: bool = False + dnpath: bool = False + dnfullpath: bool = False + + def __post_init__(self) -> None: + super(Arguments, self).__post_init__() # super() not working here, unclear why. + + # extract special attribute names + attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.columns} # index by lowercase name + # create fake attributes on demand + if attributes_set.pop("dndomain", ""): + self.dndomain = True + if attributes_set.pop("dnpath", ""): + self.dnpath = True + if attributes_set.pop("dnfullpath", ""): + self.dnfullpath = True + # store remaining attributes (with original case) + self.attributes = list(attributes_set.values()) + if self.columns and not self.attributes: + # if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes + self.attributes = ["dn"] diff --git a/src/ldaptool/search/arguments.py b/src/ldaptool/search/arguments.py index 1eef170..8dc3ffb 100644 --- a/src/ldaptool/search/arguments.py +++ b/src/ldaptool/search/arguments.py @@ -1,6 +1,5 @@ from __future__ import annotations -import argparse import dataclasses import typing @@ -8,28 +7,8 @@ import ldaptool.decode.arguments from ldaptool._utils import argclasses -def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: - parser.add_argument( - metavar="attributes", - dest=dest, - nargs="*", - help=""" - Attributes to lookup (and columns to display in tables). - Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn). - """, - ) - - @dataclasses.dataclass(slots=True, kw_only=True) class Arguments(ldaptool.decode.arguments.Arguments): - # overwrite fields for fake attributes to remove them from argparse; - # we enable those based on the attribute list - dndomain: bool = False - dnpath: bool = False - dnfullpath: bool = False - - attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) - columns: list[str] = dataclasses.field(default_factory=list) filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter")) find: typing.Optional[str] = dataclasses.field( default=None, @@ -75,6 +54,8 @@ class Arguments(ldaptool.decode.arguments.Arguments): ) def __post_init__(self) -> None: + super(Arguments, self).__post_init__() # super() not working here, unclear why. + if not self.filter is None: if not self.find is None: raise SystemExit("Can't use both --find and --filter") @@ -86,19 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments): else: # probably doesn't like empty filter? self.filter = "(objectClass=*)" - - # extract special attribute names - self.columns = self.attributes # use all names for columns (headings and their order) - attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name - # create fake attributes on demand - if attributes_set.pop("dndomain", ""): - self.dndomain = True - if attributes_set.pop("dnpath", ""): - self.dnpath = True - if attributes_set.pop("dnfullpath", ""): - self.dnfullpath = True - # store remaining attributes (with original case) - self.attributes = list(attributes_set.values()) - if self.columns and not self.attributes: - # if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes - self.attributes = ["dn"] From bc1eb6573815e318041647fa220451cd028edeab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Fri, 28 Apr 2023 19:58:27 +0200 Subject: [PATCH 2/4] move json output format handling to main tool from decoder --- src/ldaptool/_main.py | 21 +++++++++--- src/ldaptool/decode/_decoder.py | 59 ++++++++++++++++---------------- src/ldaptool/decode/arguments.py | 8 ----- 3 files changed, 47 insertions(+), 41 deletions(-) diff --git a/src/ldaptool/_main.py b/src/ldaptool/_main.py index 60f081e..4b396c0 100644 --- a/src/ldaptool/_main.py +++ b/src/ldaptool/_main.py @@ -42,19 +42,27 @@ class Arguments(search.Arguments): help="Markdown table output - requires list of attributes", ), ) - table_output: typing.Optional[TableOutput] = None html: bool = dataclasses.field( default=False, metadata=argclasses.arg( help="HTML table output - requires list of attributes", ), ) + table_output: typing.Optional[TableOutput] = None sort: bool = dataclasses.field( default=False, metadata=argclasses.arg( help="Sorted table output - defaults to markdown --table unless --csv is given", ), ) + json: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Use full json output"), + ) + human: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"), + ) def __post_init__(self) -> None: super(Arguments, self).__post_init__() # super() not working here, unclear why. @@ -141,7 +149,7 @@ class _Context: continue # normal entry assert not isinstance(entry, list) - obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry)) + obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry)) yield tuple(obj.get(key, "") for key in column_keys) except SizeLimitExceeded as e: raise SystemExit(f"Error: {e}") @@ -201,8 +209,13 @@ class _Context: # normal entry assert not isinstance(entry, list) num_entries += 1 - obj = decoder.read(dn=dn, entry=entry) - decoder.emit(dn=dn, entry=obj) + if ldif_output: + decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream) + elif self.arguments.human: + decoder.read_and_emit_human(dn=dn, entry=entry, file=stream) + else: + assert self.arguments.json + decoder.read_and_emit_json(dn=dn, entry=entry, file=stream) except SizeLimitExceeded as e: raise SystemExit(f"Error: {e}") diff --git a/src/ldaptool/decode/_decoder.py b/src/ldaptool/decode/_decoder.py index 128036b..031ccf8 100644 --- a/src/ldaptool/decode/_decoder.py +++ b/src/ldaptool/decode/_decoder.py @@ -122,16 +122,16 @@ class Attribute: def _base64_value(self) -> str: return base64.b64encode(self.raw).decode("ascii") - def print(self) -> None: + def print(self, *, file: typing.IO[str] = sys.stdout) -> None: if not self.decoded is None: comment = self.utf8_clean if comment is None: comment = self._base64_value - print(f"{self.name}: {self.decoded} # {comment}") + print(f"{self.name}: {self.decoded} # {comment}", file=file) elif not self.utf8_clean is None: - print(f"{self.name}: {self.utf8_clean}") + print(f"{self.name}: {self.utf8_clean}", file=file) else: - print(f"{self.name}:: {self._base64_value}") + print(f"{self.name}:: {self._base64_value}", file=file) def to_json(self) -> dict[str, typing.Any]: item: dict[str, typing.Any] = {} @@ -191,39 +191,40 @@ class Decoder: ] return decoded_entry - def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]: + def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]: emit: dict[str, typing.Any] = dict(dn=dn) - for name, attrs in entry.items(): + for name, attrs in obj.items(): emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) return emit - def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]: + def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: + emit = self.human(dn=dn, obj=obj) + json.dump(emit, file, ensure_ascii=False) + print(file=file) # terminate output dicts by newline + + def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: + self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) + + def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]: emit: dict[str, typing.Any] = dict(dn=dn) - for name, attrs in entry.items(): + for name, attrs in obj.items(): emit[name] = [attr.to_json() for attr in attrs] return emit - def _emit_json(self, *, dn: str, entry: TDecoded) -> None: - if self.arguments.human: - emit = self.human(dn=dn, entry=entry) - else: - emit = self.json(dn=dn, entry=entry) - json.dump(emit, sys.stdout, ensure_ascii=False) - print() # terminate output dicts by newline + def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: + emit = self.json(dn=dn, obj=obj) + json.dump(emit, file, ensure_ascii=False) + print(file=file) # terminate output dicts by newline - def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None: - print(f"dn: {dn}") - for attrs in entry.values(): + def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: + self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) + + def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: + print(f"dn: {dn}", file=file) + for attrs in obj.values(): for attr in attrs: - attr.print() - print() # separate entries with newlines + attr.print(file=file) + print(file=file) # separate entries with newlines - def emit(self, *, dn: str, entry: TDecoded) -> None: - if self.arguments.human or self.arguments.json: - self._emit_json(dn=dn, entry=entry) - else: - self._emit_ldif(dn=dn, entry=entry) - - def handle(self, *, dn: str, entry: TEntry) -> None: - entry_attrs = self.read(dn=dn, entry=entry) - self.emit(dn=dn, entry=entry_attrs) + def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: + self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) diff --git a/src/ldaptool/decode/arguments.py b/src/ldaptool/decode/arguments.py index c4ea92e..524c691 100644 --- a/src/ldaptool/decode/arguments.py +++ b/src/ldaptool/decode/arguments.py @@ -23,14 +23,6 @@ class Arguments(argclasses.BaseArguments): columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) attributes: list[str] = dataclasses.field(default_factory=list) - json: bool = dataclasses.field( - default=False, - metadata=argclasses.arg(help="Use full json output"), - ) - human: bool = dataclasses.field( - default=False, - metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"), - ) human_separator: str = dataclasses.field( default=", ", metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), From cd7cfe451c162d2e1484c4a0a1eebb75bcd8fdd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Fri, 28 Apr 2023 20:48:36 +0200 Subject: [PATCH 3/4] support attribute post-processing; :, and DN :domain, :path, :fullpath --- src/ldaptool/_main.py | 5 +- src/ldaptool/decode/__init__.py | 2 + src/ldaptool/decode/_decoder.py | 47 ++++++++------ src/ldaptool/decode/_postprocess.py | 96 +++++++++++++++++++++++++++++ src/ldaptool/decode/arguments.py | 54 +++++++++++----- 5 files changed, 169 insertions(+), 35 deletions(-) create mode 100644 src/ldaptool/decode/_postprocess.py diff --git a/src/ldaptool/_main.py b/src/ldaptool/_main.py index 4b396c0..b5c6dad 100644 --- a/src/ldaptool/_main.py +++ b/src/ldaptool/_main.py @@ -106,7 +106,10 @@ class _Context: self.config = search.Config.load() except Exception as e: raise SystemExit(f"config error: {e}") - self.arguments = arguments_p.from_args(args) + try: + self.arguments = arguments_p.from_args(args) + except decode.InvalidStep as e: + raise SystemExit(f"invalid arguments: {e}") def run(self) -> None: # starting the search sets the base we want to print diff --git a/src/ldaptool/decode/__init__.py b/src/ldaptool/decode/__init__.py index 423e59a..6d98867 100644 --- a/src/ldaptool/decode/__init__.py +++ b/src/ldaptool/decode/__init__.py @@ -1,10 +1,12 @@ from __future__ import annotations from ._decoder import Attribute, Decoder +from ._postprocess import InvalidStep from .arguments import Arguments __all__ = [ "Arguments", "Attribute", "Decoder", + "InvalidStep", ] diff --git a/src/ldaptool/decode/_decoder.py b/src/ldaptool/decode/_decoder.py index 031ccf8..28ef42c 100644 --- a/src/ldaptool/decode/_decoder.py +++ b/src/ldaptool/decode/_decoder.py @@ -8,8 +8,6 @@ import sys import typing import uuid -from ldaptool._utils.dninfo import DNInfo - from . import _types from .arguments import Arguments @@ -175,20 +173,20 @@ class Decoder: name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] for name, raw_values in entry.items() } - if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath: - dninfo = DNInfo(dn=dn) - if self.arguments.dndomain: - decoded_entry["dndomain"] = [ - Attribute.fake_attribute("dndomain", dninfo.domain), - ] - if self.arguments.dnpath: - decoded_entry["dnpath"] = [ - Attribute.fake_attribute("dnpath", dninfo.path), - ] - if self.arguments.dnfullpath: - decoded_entry["dnfullpath"] = [ - Attribute.fake_attribute("dnfullpath", dninfo.full_path), + + for attr, post_processes in self.arguments.post_process.items(): + if attr == "dn": + values = [dn] + else: + attrs = decoded_entry.get(attr, None) + if attrs is None: + continue + values = [at.human() for at in attrs] + for column, post_process in post_processes.items(): + decoded_entry[column] = [ + Attribute.fake_attribute(column, post_process.process(value)) for value in values ] + return decoded_entry def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]: @@ -221,9 +219,22 @@ class Decoder: def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: print(f"dn: {dn}", file=file) - for attrs in obj.values(): - for attr in attrs: - attr.print(file=file) + attrs: typing.Optional[list[Attribute]] + if not self.arguments.attributes: + # show all attributes - use order from server + for attrs in obj.values(): + for attr in attrs: + attr.print(file=file) + else: + # only selected columns; use given order + for column in self.arguments.columns_keys: + if column == "dn": + continue # already printed dn + attrs = obj.get(column, None) + if attrs is None: + continue + for attr in attrs: + attr.print(file=file) print(file=file) # separate entries with newlines def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: diff --git a/src/ldaptool/decode/_postprocess.py b/src/ldaptool/decode/_postprocess.py new file mode 100644 index 0000000..d7529e6 --- /dev/null +++ b/src/ldaptool/decode/_postprocess.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import abc +import dataclasses + +from ldaptool._utils.dninfo import DNInfo + + +class Step(abc.ABC): + __slots__ = () + + @abc.abstractmethod + def step(self, value: str) -> str: + ... + + +@dataclasses.dataclass(slots=True) +class MaxLength(Step): + limit: int + + def step(self, value: str) -> str: + if not self.limit or len(value) <= self.limit: + return value + return value[: self.limit - 1] + "…" + + +@dataclasses.dataclass(slots=True) +class DNDomain(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.domain + + +@dataclasses.dataclass(slots=True) +class DNPath(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.path + + +@dataclasses.dataclass(slots=True) +class DNFullPath(Step): + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return dninfo.full_path + + +_STEPS = { + "domain": DNDomain(), + "path": DNPath(), + "fullpath": DNFullPath(), +} + + +class InvalidStep(Exception): + pass + + +@dataclasses.dataclass(slots=True) +class PostProcess: + steps: list[Step] + + def process(self, value: str) -> str: + for step in self.steps: + value = step.step(value) + return value + + +def parse_steps(steps: list[str]) -> PostProcess: + max_len = 0 + try: + max_len = int(steps[-1]) + steps.pop() + except ValueError: + pass + result = [] + for step in steps: + step_i = _STEPS.get(step, None) + if step_i is None: + raise InvalidStep(f"Unknown post-processing step {step!r}") + result.append(step_i) + if max_len: + result.append(MaxLength(max_len)) + return PostProcess(result) diff --git a/src/ldaptool/decode/arguments.py b/src/ldaptool/decode/arguments.py index 524c691..90e324e 100644 --- a/src/ldaptool/decode/arguments.py +++ b/src/ldaptool/decode/arguments.py @@ -5,6 +5,8 @@ import dataclasses from ldaptool._utils import argclasses +from . import _postprocess + def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: parser.add_argument( @@ -21,6 +23,7 @@ def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: @dataclasses.dataclass(slots=True, kw_only=True) class Arguments(argclasses.BaseArguments): columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) + columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names attributes: list[str] = dataclasses.field(default_factory=list) human_separator: str = dataclasses.field( @@ -33,24 +36,43 @@ class Arguments(argclasses.BaseArguments): metadata=argclasses.arg(help="Use only date part of decoded timestamps"), ) - dndomain: bool = False - dnpath: bool = False - dnfullpath: bool = False + post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict) def __post_init__(self) -> None: super(Arguments, self).__post_init__() # super() not working here, unclear why. # extract special attribute names - attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.columns} # index by lowercase name - # create fake attributes on demand - if attributes_set.pop("dndomain", ""): - self.dndomain = True - if attributes_set.pop("dnpath", ""): - self.dnpath = True - if attributes_set.pop("dnfullpath", ""): - self.dnfullpath = True - # store remaining attributes (with original case) - self.attributes = list(attributes_set.values()) - if self.columns and not self.attributes: - # if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes - self.attributes = ["dn"] + all_attributes = False + attributes_set: set[str] = set() + self.columns_keys = [] + for column in list(self.columns): + column = column.lower() + + if column == "*": + # '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing + self.columns.remove("*") + all_attributes = True + continue + + self.columns_keys.append(column) + + if column == "dndomain": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"]) + attributes_set.add("dn") + elif column == "dnpath": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"]) + attributes_set.add("dn") + elif column == "dnfullpath": + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"]) + attributes_set.add("dn") + else: + step_names = column.split(":") + attributes_set.add(step_names[0]) + if len(step_names) > 1: + source = step_names.pop(0) + self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names) + + if all_attributes: + self.attributes = [] + else: + self.attributes = list(attributes_set) From 357b1ae9cb8ab112886416445ec0a4be7e251579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Tue, 2 May 2023 16:32:02 +0200 Subject: [PATCH 4/4] use Enum instead of StrEnum for python3.10 --- src/ldaptool/_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ldaptool/_main.py b/src/ldaptool/_main.py index b5c6dad..38fabb0 100644 --- a/src/ldaptool/_main.py +++ b/src/ldaptool/_main.py @@ -14,7 +14,7 @@ from ldaptool._utils import argclasses from ldaptool._utils.ldap import Result, SizeLimitExceeded -class TableOutput(enum.StrEnum): +class TableOutput(enum.Enum): MARKDOWN = "markdown" CSV = "csv" HTML = "html"