support attribute post-processing; :<len>, and DN :domain, :path, :fullpath
This commit is contained in:
parent
bc1eb65738
commit
cd7cfe451c
@ -106,7 +106,10 @@ class _Context:
|
||||
self.config = search.Config.load()
|
||||
except Exception as e:
|
||||
raise SystemExit(f"config error: {e}")
|
||||
try:
|
||||
self.arguments = arguments_p.from_args(args)
|
||||
except decode.InvalidStep as e:
|
||||
raise SystemExit(f"invalid arguments: {e}")
|
||||
|
||||
def run(self) -> None:
|
||||
# starting the search sets the base we want to print
|
||||
|
@ -1,10 +1,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from ._decoder import Attribute, Decoder
|
||||
from ._postprocess import InvalidStep
|
||||
from .arguments import Arguments
|
||||
|
||||
__all__ = [
|
||||
"Arguments",
|
||||
"Attribute",
|
||||
"Decoder",
|
||||
"InvalidStep",
|
||||
]
|
||||
|
@ -8,8 +8,6 @@ import sys
|
||||
import typing
|
||||
import uuid
|
||||
|
||||
from ldaptool._utils.dninfo import DNInfo
|
||||
|
||||
from . import _types
|
||||
from .arguments import Arguments
|
||||
|
||||
@ -175,20 +173,20 @@ class Decoder:
|
||||
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
|
||||
for name, raw_values in entry.items()
|
||||
}
|
||||
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
|
||||
dninfo = DNInfo(dn=dn)
|
||||
if self.arguments.dndomain:
|
||||
decoded_entry["dndomain"] = [
|
||||
Attribute.fake_attribute("dndomain", dninfo.domain),
|
||||
]
|
||||
if self.arguments.dnpath:
|
||||
decoded_entry["dnpath"] = [
|
||||
Attribute.fake_attribute("dnpath", dninfo.path),
|
||||
]
|
||||
if self.arguments.dnfullpath:
|
||||
decoded_entry["dnfullpath"] = [
|
||||
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
|
||||
|
||||
for attr, post_processes in self.arguments.post_process.items():
|
||||
if attr == "dn":
|
||||
values = [dn]
|
||||
else:
|
||||
attrs = decoded_entry.get(attr, None)
|
||||
if attrs is None:
|
||||
continue
|
||||
values = [at.human() for at in attrs]
|
||||
for column, post_process in post_processes.items():
|
||||
decoded_entry[column] = [
|
||||
Attribute.fake_attribute(column, post_process.process(value)) for value in values
|
||||
]
|
||||
|
||||
return decoded_entry
|
||||
|
||||
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||
@ -221,9 +219,22 @@ class Decoder:
|
||||
|
||||
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||
print(f"dn: {dn}", file=file)
|
||||
attrs: typing.Optional[list[Attribute]]
|
||||
if not self.arguments.attributes:
|
||||
# show all attributes - use order from server
|
||||
for attrs in obj.values():
|
||||
for attr in attrs:
|
||||
attr.print(file=file)
|
||||
else:
|
||||
# only selected columns; use given order
|
||||
for column in self.arguments.columns_keys:
|
||||
if column == "dn":
|
||||
continue # already printed dn
|
||||
attrs = obj.get(column, None)
|
||||
if attrs is None:
|
||||
continue
|
||||
for attr in attrs:
|
||||
attr.print(file=file)
|
||||
print(file=file) # separate entries with newlines
|
||||
|
||||
def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||
|
96
src/ldaptool/decode/_postprocess.py
Normal file
96
src/ldaptool/decode/_postprocess.py
Normal file
@ -0,0 +1,96 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import dataclasses
|
||||
|
||||
from ldaptool._utils.dninfo import DNInfo
|
||||
|
||||
|
||||
class Step(abc.ABC):
|
||||
__slots__ = ()
|
||||
|
||||
@abc.abstractmethod
|
||||
def step(self, value: str) -> str:
|
||||
...
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class MaxLength(Step):
|
||||
limit: int
|
||||
|
||||
def step(self, value: str) -> str:
|
||||
if not self.limit or len(value) <= self.limit:
|
||||
return value
|
||||
return value[: self.limit - 1] + "…"
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class DNDomain(Step):
|
||||
def step(self, value: str) -> str:
|
||||
try:
|
||||
dninfo = DNInfo(dn=value)
|
||||
except Exception:
|
||||
# not a valid DN -> no processing
|
||||
return value
|
||||
return dninfo.domain
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class DNPath(Step):
|
||||
def step(self, value: str) -> str:
|
||||
try:
|
||||
dninfo = DNInfo(dn=value)
|
||||
except Exception:
|
||||
# not a valid DN -> no processing
|
||||
return value
|
||||
return dninfo.path
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class DNFullPath(Step):
|
||||
def step(self, value: str) -> str:
|
||||
try:
|
||||
dninfo = DNInfo(dn=value)
|
||||
except Exception:
|
||||
# not a valid DN -> no processing
|
||||
return value
|
||||
return dninfo.full_path
|
||||
|
||||
|
||||
_STEPS = {
|
||||
"domain": DNDomain(),
|
||||
"path": DNPath(),
|
||||
"fullpath": DNFullPath(),
|
||||
}
|
||||
|
||||
|
||||
class InvalidStep(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class PostProcess:
|
||||
steps: list[Step]
|
||||
|
||||
def process(self, value: str) -> str:
|
||||
for step in self.steps:
|
||||
value = step.step(value)
|
||||
return value
|
||||
|
||||
|
||||
def parse_steps(steps: list[str]) -> PostProcess:
|
||||
max_len = 0
|
||||
try:
|
||||
max_len = int(steps[-1])
|
||||
steps.pop()
|
||||
except ValueError:
|
||||
pass
|
||||
result = []
|
||||
for step in steps:
|
||||
step_i = _STEPS.get(step, None)
|
||||
if step_i is None:
|
||||
raise InvalidStep(f"Unknown post-processing step {step!r}")
|
||||
result.append(step_i)
|
||||
if max_len:
|
||||
result.append(MaxLength(max_len))
|
||||
return PostProcess(result)
|
@ -5,6 +5,8 @@ import dataclasses
|
||||
|
||||
from ldaptool._utils import argclasses
|
||||
|
||||
from . import _postprocess
|
||||
|
||||
|
||||
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
|
||||
parser.add_argument(
|
||||
@ -21,6 +23,7 @@ def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
|
||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||
class Arguments(argclasses.BaseArguments):
|
||||
columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
|
||||
columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
|
||||
attributes: list[str] = dataclasses.field(default_factory=list)
|
||||
|
||||
human_separator: str = dataclasses.field(
|
||||
@ -33,24 +36,43 @@ class Arguments(argclasses.BaseArguments):
|
||||
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
|
||||
)
|
||||
|
||||
dndomain: bool = False
|
||||
dnpath: bool = False
|
||||
dnfullpath: bool = False
|
||||
post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
||||
|
||||
# extract special attribute names
|
||||
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.columns} # index by lowercase name
|
||||
# create fake attributes on demand
|
||||
if attributes_set.pop("dndomain", ""):
|
||||
self.dndomain = True
|
||||
if attributes_set.pop("dnpath", ""):
|
||||
self.dnpath = True
|
||||
if attributes_set.pop("dnfullpath", ""):
|
||||
self.dnfullpath = True
|
||||
# store remaining attributes (with original case)
|
||||
self.attributes = list(attributes_set.values())
|
||||
if self.columns and not self.attributes:
|
||||
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
|
||||
self.attributes = ["dn"]
|
||||
all_attributes = False
|
||||
attributes_set: set[str] = set()
|
||||
self.columns_keys = []
|
||||
for column in list(self.columns):
|
||||
column = column.lower()
|
||||
|
||||
if column == "*":
|
||||
# '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
|
||||
self.columns.remove("*")
|
||||
all_attributes = True
|
||||
continue
|
||||
|
||||
self.columns_keys.append(column)
|
||||
|
||||
if column == "dndomain":
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"])
|
||||
attributes_set.add("dn")
|
||||
elif column == "dnpath":
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"])
|
||||
attributes_set.add("dn")
|
||||
elif column == "dnfullpath":
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"])
|
||||
attributes_set.add("dn")
|
||||
else:
|
||||
step_names = column.split(":")
|
||||
attributes_set.add(step_names[0])
|
||||
if len(step_names) > 1:
|
||||
source = step_names.pop(0)
|
||||
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names)
|
||||
|
||||
if all_attributes:
|
||||
self.attributes = []
|
||||
else:
|
||||
self.attributes = list(attributes_set)
|
||||
|
Loading…
Reference in New Issue
Block a user