Compare commits

...

3 Commits

7 changed files with 242 additions and 117 deletions

View File

@ -42,19 +42,27 @@ class Arguments(search.Arguments):
help="Markdown table output - requires list of attributes", help="Markdown table output - requires list of attributes",
), ),
) )
table_output: typing.Optional[TableOutput] = None
html: bool = dataclasses.field( html: bool = dataclasses.field(
default=False, default=False,
metadata=argclasses.arg( metadata=argclasses.arg(
help="HTML table output - requires list of attributes", help="HTML table output - requires list of attributes",
), ),
) )
table_output: typing.Optional[TableOutput] = None
sort: bool = dataclasses.field( sort: bool = dataclasses.field(
default=False, default=False,
metadata=argclasses.arg( metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given", help="Sorted table output - defaults to markdown --table unless --csv is given",
), ),
) )
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use full json output"),
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
def __post_init__(self) -> None: def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why. super(Arguments, self).__post_init__() # super() not working here, unclear why.
@ -98,7 +106,10 @@ class _Context:
self.config = search.Config.load() self.config = search.Config.load()
except Exception as e: except Exception as e:
raise SystemExit(f"config error: {e}") raise SystemExit(f"config error: {e}")
try:
self.arguments = arguments_p.from_args(args) self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e:
raise SystemExit(f"invalid arguments: {e}")
def run(self) -> None: def run(self) -> None:
# starting the search sets the base we want to print # starting the search sets the base we want to print
@ -141,7 +152,7 @@ class _Context:
continue continue
# normal entry # normal entry
assert not isinstance(entry, list) assert not isinstance(entry, list)
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry)) obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys) yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")
@ -201,8 +212,13 @@ class _Context:
# normal entry # normal entry
assert not isinstance(entry, list) assert not isinstance(entry, list)
num_entries += 1 num_entries += 1
obj = decoder.read(dn=dn, entry=entry) if ldif_output:
decoder.emit(dn=dn, entry=obj) decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
elif self.arguments.human:
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
else:
assert self.arguments.json
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")

View File

@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class BaseArguments: class BaseArguments:
def __post_init__(self) -> None:
pass
@classmethod @classmethod
def add_fields_to_parser( def add_fields_to_parser(
cls: type[_TArgs], cls: type[_TArgs],

View File

@ -1,10 +1,12 @@
from __future__ import annotations from __future__ import annotations
from ._decoder import Attribute, Decoder from ._decoder import Attribute, Decoder
from ._postprocess import InvalidStep
from .arguments import Arguments from .arguments import Arguments
__all__ = [ __all__ = [
"Arguments", "Arguments",
"Attribute", "Attribute",
"Decoder", "Decoder",
"InvalidStep",
] ]

View File

@ -8,8 +8,6 @@ import sys
import typing import typing
import uuid import uuid
from ldaptool._utils.dninfo import DNInfo
from . import _types from . import _types
from .arguments import Arguments from .arguments import Arguments
@ -122,16 +120,16 @@ class Attribute:
def _base64_value(self) -> str: def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii") return base64.b64encode(self.raw).decode("ascii")
def print(self) -> None: def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
if not self.decoded is None: if not self.decoded is None:
comment = self.utf8_clean comment = self.utf8_clean
if comment is None: if comment is None:
comment = self._base64_value comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}") print(f"{self.name}: {self.decoded} # {comment}", file=file)
elif not self.utf8_clean is None: elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}") print(f"{self.name}: {self.utf8_clean}", file=file)
else: else:
print(f"{self.name}:: {self._base64_value}") print(f"{self.name}:: {self._base64_value}", file=file)
def to_json(self) -> dict[str, typing.Any]: def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {} item: dict[str, typing.Any] = {}
@ -175,55 +173,69 @@ class Decoder:
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items() for name, raw_values in entry.items()
} }
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
dninfo = DNInfo(dn=dn) for attr, post_processes in self.arguments.post_process.items():
if self.arguments.dndomain: if attr == "dn":
decoded_entry["dndomain"] = [ values = [dn]
Attribute.fake_attribute("dndomain", dninfo.domain), else:
] attrs = decoded_entry.get(attr, None)
if self.arguments.dnpath: if attrs is None:
decoded_entry["dnpath"] = [ continue
Attribute.fake_attribute("dnpath", dninfo.path), values = [at.human() for at in attrs]
] for column, post_process in post_processes.items():
if self.arguments.dnfullpath: decoded_entry[column] = [
decoded_entry["dnfullpath"] = [ Attribute.fake_attribute(column, post_process.process(value)) for value in values
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
] ]
return decoded_entry return decoded_entry
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit return emit
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.human(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = [attr.to_json() for attr in attrs] emit[name] = [attr.to_json() for attr in attrs]
return emit return emit
def _emit_json(self, *, dn: str, entry: TDecoded) -> None: def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
if self.arguments.human: emit = self.json(dn=dn, obj=obj)
emit = self.human(dn=dn, entry=entry) json.dump(emit, file, ensure_ascii=False)
else: print(file=file) # terminate output dicts by newline
emit = self.json(dn=dn, entry=entry)
json.dump(emit, sys.stdout, ensure_ascii=False)
print() # terminate output dicts by newline
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None: def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}") self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
for attrs in entry.values():
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file)
attrs: typing.Optional[list[Attribute]]
if not self.arguments.attributes:
# show all attributes - use order from server
for attrs in obj.values():
for attr in attrs: for attr in attrs:
attr.print() attr.print(file=file)
print() # separate entries with newlines
def emit(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human or self.arguments.json:
self._emit_json(dn=dn, entry=entry)
else: else:
self._emit_ldif(dn=dn, entry=entry) # only selected columns; use given order
for column in self.arguments.columns_keys:
if column == "dn":
continue # already printed dn
attrs = obj.get(column, None)
if attrs is None:
continue
for attr in attrs:
attr.print(file=file)
print(file=file) # separate entries with newlines
def handle(self, *, dn: str, entry: TEntry) -> None: def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
entry_attrs = self.read(dn=dn, entry=entry) self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
self.emit(dn=dn, entry=entry_attrs)

View File

@ -0,0 +1,96 @@
from __future__ import annotations
import abc
import dataclasses
from ldaptool._utils.dninfo import DNInfo
class Step(abc.ABC):
__slots__ = ()
@abc.abstractmethod
def step(self, value: str) -> str:
...
@dataclasses.dataclass(slots=True)
class MaxLength(Step):
limit: int
def step(self, value: str) -> str:
if not self.limit or len(value) <= self.limit:
return value
return value[: self.limit - 1] + ""
@dataclasses.dataclass(slots=True)
class DNDomain(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.domain
@dataclasses.dataclass(slots=True)
class DNPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.path
@dataclasses.dataclass(slots=True)
class DNFullPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.full_path
_STEPS = {
"domain": DNDomain(),
"path": DNPath(),
"fullpath": DNFullPath(),
}
class InvalidStep(Exception):
pass
@dataclasses.dataclass(slots=True)
class PostProcess:
steps: list[Step]
def process(self, value: str) -> str:
for step in self.steps:
value = step.step(value)
return value
def parse_steps(steps: list[str]) -> PostProcess:
max_len = 0
try:
max_len = int(steps[-1])
steps.pop()
except ValueError:
pass
result = []
for step in steps:
step_i = _STEPS.get(step, None)
if step_i is None:
raise InvalidStep(f"Unknown post-processing step {step!r}")
result.append(step_i)
if max_len:
result.append(MaxLength(max_len))
return PostProcess(result)

View File

@ -1,47 +1,78 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
from . import _postprocess
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(argclasses.BaseArguments): class Arguments(argclasses.BaseArguments):
json: bool = dataclasses.field( columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
default=False, columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
metadata=argclasses.arg(help="Use full json output"), attributes: list[str] = dataclasses.field(default_factory=list)
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
human_separator: str = dataclasses.field( human_separator: str = dataclasses.field(
default=", ", default=", ",
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
) )
dateonly: bool = dataclasses.field( dateonly: bool = dataclasses.field(
default=True, default=True,
metadata=argclasses.arg(help="Use only date part of decoded timestamps"), metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
) )
dndomain: bool = dataclasses.field(
default=False, post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
) def __post_init__(self) -> None:
dnpath: bool = dataclasses.field( super(Arguments, self).__post_init__() # super() not working here, unclear why.
default=False,
metadata=argclasses.arg( # extract special attribute names
help=""" all_attributes = False
Whether to export a virtual dnpath attribute attributes_set: set[str] = set()
('/' joined values of reversed DN without DNS labels) self.columns_keys = []
""" for column in list(self.columns):
), column = column.lower()
)
dnfullpath: bool = dataclasses.field( if column == "*":
default=False, # '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
metadata=argclasses.arg( self.columns.remove("*")
help=""" all_attributes = True
Whether to export a virtual dnfullpath attribute continue
('/' joined values of reversed DN; DNS domain as first label)
""" self.columns_keys.append(column)
),
) if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"])
attributes_set.add("dn")
elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"])
attributes_set.add("dn")
elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"])
attributes_set.add("dn")
else:
step_names = column.split(":")
attributes_set.add(step_names[0])
if len(step_names) > 1:
source = step_names.pop(0)
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names)
if all_attributes:
self.attributes = []
else:
self.attributes = list(attributes_set)

View File

@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
import typing import typing
@ -8,28 +7,8 @@ import ldaptool.decode.arguments
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(ldaptool.decode.arguments.Arguments): class Arguments(ldaptool.decode.arguments.Arguments):
# overwrite fields for fake attributes to remove them from argparse;
# we enable those based on the attribute list
dndomain: bool = False
dnpath: bool = False
dnfullpath: bool = False
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns: list[str] = dataclasses.field(default_factory=list)
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter")) filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
find: typing.Optional[str] = dataclasses.field( find: typing.Optional[str] = dataclasses.field(
default=None, default=None,
@ -75,6 +54,8 @@ class Arguments(ldaptool.decode.arguments.Arguments):
) )
def __post_init__(self) -> None: def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
if not self.filter is None: if not self.filter is None:
if not self.find is None: if not self.find is None:
raise SystemExit("Can't use both --find and --filter") raise SystemExit("Can't use both --find and --filter")
@ -86,19 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
else: else:
# probably doesn't like empty filter? # probably doesn't like empty filter?
self.filter = "(objectClass=*)" self.filter = "(objectClass=*)"
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
# create fake attributes on demand
if attributes_set.pop("dndomain", ""):
self.dndomain = True
if attributes_set.pop("dnpath", ""):
self.dnpath = True
if attributes_set.pop("dnfullpath", ""):
self.dnfullpath = True
# store remaining attributes (with original case)
self.attributes = list(attributes_set.values())
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]