Compare commits
No commits in common. "18a27b195e060fb98f026f880dbdac3c2c5f179b" and "d9803c226e0314a7d94a723691137a326ca109de" have entirely different histories.
18a27b195e
...
d9803c226e
9
debian/changelog
vendored
9
debian/changelog
vendored
@ -1,12 +1,3 @@
|
|||||||
ldaptool (0.4-1) unstable; urgency=medium
|
|
||||||
|
|
||||||
* move argument/column handling to decoder (prepare for more post-processing in decoder)
|
|
||||||
* move json output format handling to main tool from decoder
|
|
||||||
* support attribute post-processing; :<len>, and DN :domain, :path, :fullpath
|
|
||||||
* use Enum instead of StrEnum for python3.10
|
|
||||||
|
|
||||||
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Tue, 02 May 2023 16:54:00 +0200
|
|
||||||
|
|
||||||
ldaptool (0.3-1) unstable; urgency=medium
|
ldaptool (0.3-1) unstable; urgency=medium
|
||||||
|
|
||||||
* ldaptool: move output arguments from search to main
|
* ldaptool: move output arguments from search to main
|
||||||
|
@ -14,7 +14,7 @@ from ldaptool._utils import argclasses
|
|||||||
from ldaptool._utils.ldap import Result, SizeLimitExceeded
|
from ldaptool._utils.ldap import Result, SizeLimitExceeded
|
||||||
|
|
||||||
|
|
||||||
class TableOutput(enum.Enum):
|
class TableOutput(enum.StrEnum):
|
||||||
MARKDOWN = "markdown"
|
MARKDOWN = "markdown"
|
||||||
CSV = "csv"
|
CSV = "csv"
|
||||||
HTML = "html"
|
HTML = "html"
|
||||||
@ -42,27 +42,19 @@ class Arguments(search.Arguments):
|
|||||||
help="Markdown table output - requires list of attributes",
|
help="Markdown table output - requires list of attributes",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
table_output: typing.Optional[TableOutput] = None
|
||||||
html: bool = dataclasses.field(
|
html: bool = dataclasses.field(
|
||||||
default=False,
|
default=False,
|
||||||
metadata=argclasses.arg(
|
metadata=argclasses.arg(
|
||||||
help="HTML table output - requires list of attributes",
|
help="HTML table output - requires list of attributes",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
table_output: typing.Optional[TableOutput] = None
|
|
||||||
sort: bool = dataclasses.field(
|
sort: bool = dataclasses.field(
|
||||||
default=False,
|
default=False,
|
||||||
metadata=argclasses.arg(
|
metadata=argclasses.arg(
|
||||||
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
json: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="Use full json output"),
|
|
||||||
)
|
|
||||||
human: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
|
|
||||||
)
|
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
||||||
@ -106,10 +98,7 @@ class _Context:
|
|||||||
self.config = search.Config.load()
|
self.config = search.Config.load()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise SystemExit(f"config error: {e}")
|
raise SystemExit(f"config error: {e}")
|
||||||
try:
|
self.arguments = arguments_p.from_args(args)
|
||||||
self.arguments = arguments_p.from_args(args)
|
|
||||||
except decode.InvalidStep as e:
|
|
||||||
raise SystemExit(f"invalid arguments: {e}")
|
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
# starting the search sets the base we want to print
|
# starting the search sets the base we want to print
|
||||||
@ -152,7 +141,7 @@ class _Context:
|
|||||||
continue
|
continue
|
||||||
# normal entry
|
# normal entry
|
||||||
assert not isinstance(entry, list)
|
assert not isinstance(entry, list)
|
||||||
obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
|
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
|
||||||
yield tuple(obj.get(key, "") for key in column_keys)
|
yield tuple(obj.get(key, "") for key in column_keys)
|
||||||
except SizeLimitExceeded as e:
|
except SizeLimitExceeded as e:
|
||||||
raise SystemExit(f"Error: {e}")
|
raise SystemExit(f"Error: {e}")
|
||||||
@ -212,13 +201,8 @@ class _Context:
|
|||||||
# normal entry
|
# normal entry
|
||||||
assert not isinstance(entry, list)
|
assert not isinstance(entry, list)
|
||||||
num_entries += 1
|
num_entries += 1
|
||||||
if ldif_output:
|
obj = decoder.read(dn=dn, entry=entry)
|
||||||
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
|
decoder.emit(dn=dn, entry=obj)
|
||||||
elif self.arguments.human:
|
|
||||||
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
|
|
||||||
else:
|
|
||||||
assert self.arguments.json
|
|
||||||
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
|
|
||||||
except SizeLimitExceeded as e:
|
except SizeLimitExceeded as e:
|
||||||
raise SystemExit(f"Error: {e}")
|
raise SystemExit(f"Error: {e}")
|
||||||
|
|
||||||
|
@ -74,9 +74,6 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
|
|||||||
|
|
||||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||||
class BaseArguments:
|
class BaseArguments:
|
||||||
def __post_init__(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_fields_to_parser(
|
def add_fields_to_parser(
|
||||||
cls: type[_TArgs],
|
cls: type[_TArgs],
|
||||||
|
@ -1,12 +1,10 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from ._decoder import Attribute, Decoder
|
from ._decoder import Attribute, Decoder
|
||||||
from ._postprocess import InvalidStep
|
|
||||||
from .arguments import Arguments
|
from .arguments import Arguments
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"Arguments",
|
"Arguments",
|
||||||
"Attribute",
|
"Attribute",
|
||||||
"Decoder",
|
"Decoder",
|
||||||
"InvalidStep",
|
|
||||||
]
|
]
|
||||||
|
@ -8,6 +8,8 @@ import sys
|
|||||||
import typing
|
import typing
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from ldaptool._utils.dninfo import DNInfo
|
||||||
|
|
||||||
from . import _types
|
from . import _types
|
||||||
from .arguments import Arguments
|
from .arguments import Arguments
|
||||||
|
|
||||||
@ -120,16 +122,16 @@ class Attribute:
|
|||||||
def _base64_value(self) -> str:
|
def _base64_value(self) -> str:
|
||||||
return base64.b64encode(self.raw).decode("ascii")
|
return base64.b64encode(self.raw).decode("ascii")
|
||||||
|
|
||||||
def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
|
def print(self) -> None:
|
||||||
if not self.decoded is None:
|
if not self.decoded is None:
|
||||||
comment = self.utf8_clean
|
comment = self.utf8_clean
|
||||||
if comment is None:
|
if comment is None:
|
||||||
comment = self._base64_value
|
comment = self._base64_value
|
||||||
print(f"{self.name}: {self.decoded} # {comment}", file=file)
|
print(f"{self.name}: {self.decoded} # {comment}")
|
||||||
elif not self.utf8_clean is None:
|
elif not self.utf8_clean is None:
|
||||||
print(f"{self.name}: {self.utf8_clean}", file=file)
|
print(f"{self.name}: {self.utf8_clean}")
|
||||||
else:
|
else:
|
||||||
print(f"{self.name}:: {self._base64_value}", file=file)
|
print(f"{self.name}:: {self._base64_value}")
|
||||||
|
|
||||||
def to_json(self) -> dict[str, typing.Any]:
|
def to_json(self) -> dict[str, typing.Any]:
|
||||||
item: dict[str, typing.Any] = {}
|
item: dict[str, typing.Any] = {}
|
||||||
@ -173,69 +175,55 @@ class Decoder:
|
|||||||
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
|
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
|
||||||
for name, raw_values in entry.items()
|
for name, raw_values in entry.items()
|
||||||
}
|
}
|
||||||
|
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
|
||||||
for attr, post_processes in self.arguments.post_process.items():
|
dninfo = DNInfo(dn=dn)
|
||||||
if attr == "dn":
|
if self.arguments.dndomain:
|
||||||
values = [dn]
|
decoded_entry["dndomain"] = [
|
||||||
else:
|
Attribute.fake_attribute("dndomain", dninfo.domain),
|
||||||
attrs = decoded_entry.get(attr, None)
|
]
|
||||||
if attrs is None:
|
if self.arguments.dnpath:
|
||||||
continue
|
decoded_entry["dnpath"] = [
|
||||||
values = [at.human() for at in attrs]
|
Attribute.fake_attribute("dnpath", dninfo.path),
|
||||||
for column, post_process in post_processes.items():
|
]
|
||||||
decoded_entry[column] = [
|
if self.arguments.dnfullpath:
|
||||||
Attribute.fake_attribute(column, post_process.process(value)) for value in values
|
decoded_entry["dnfullpath"] = [
|
||||||
|
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
|
||||||
]
|
]
|
||||||
|
|
||||||
return decoded_entry
|
return decoded_entry
|
||||||
|
|
||||||
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
|
||||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
for name, attrs in obj.items():
|
for name, attrs in entry.items():
|
||||||
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
|
||||||
emit = self.human(dn=dn, obj=obj)
|
|
||||||
json.dump(emit, file, ensure_ascii=False)
|
|
||||||
print(file=file) # terminate output dicts by newline
|
|
||||||
|
|
||||||
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
|
||||||
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
|
||||||
|
|
||||||
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
|
||||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
for name, attrs in obj.items():
|
for name, attrs in entry.items():
|
||||||
emit[name] = [attr.to_json() for attr in attrs]
|
emit[name] = [attr.to_json() for attr in attrs]
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
def _emit_json(self, *, dn: str, entry: TDecoded) -> None:
|
||||||
emit = self.json(dn=dn, obj=obj)
|
if self.arguments.human:
|
||||||
json.dump(emit, file, ensure_ascii=False)
|
emit = self.human(dn=dn, entry=entry)
|
||||||
print(file=file) # terminate output dicts by newline
|
|
||||||
|
|
||||||
def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
|
||||||
self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
|
||||||
|
|
||||||
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
|
||||||
print(f"dn: {dn}", file=file)
|
|
||||||
attrs: typing.Optional[list[Attribute]]
|
|
||||||
if not self.arguments.attributes:
|
|
||||||
# show all attributes - use order from server
|
|
||||||
for attrs in obj.values():
|
|
||||||
for attr in attrs:
|
|
||||||
attr.print(file=file)
|
|
||||||
else:
|
else:
|
||||||
# only selected columns; use given order
|
emit = self.json(dn=dn, entry=entry)
|
||||||
for column in self.arguments.columns_keys:
|
json.dump(emit, sys.stdout, ensure_ascii=False)
|
||||||
if column == "dn":
|
print() # terminate output dicts by newline
|
||||||
continue # already printed dn
|
|
||||||
attrs = obj.get(column, None)
|
|
||||||
if attrs is None:
|
|
||||||
continue
|
|
||||||
for attr in attrs:
|
|
||||||
attr.print(file=file)
|
|
||||||
print(file=file) # separate entries with newlines
|
|
||||||
|
|
||||||
def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None:
|
||||||
self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
print(f"dn: {dn}")
|
||||||
|
for attrs in entry.values():
|
||||||
|
for attr in attrs:
|
||||||
|
attr.print()
|
||||||
|
print() # separate entries with newlines
|
||||||
|
|
||||||
|
def emit(self, *, dn: str, entry: TDecoded) -> None:
|
||||||
|
if self.arguments.human or self.arguments.json:
|
||||||
|
self._emit_json(dn=dn, entry=entry)
|
||||||
|
else:
|
||||||
|
self._emit_ldif(dn=dn, entry=entry)
|
||||||
|
|
||||||
|
def handle(self, *, dn: str, entry: TEntry) -> None:
|
||||||
|
entry_attrs = self.read(dn=dn, entry=entry)
|
||||||
|
self.emit(dn=dn, entry=entry_attrs)
|
||||||
|
@ -1,96 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import abc
|
|
||||||
import dataclasses
|
|
||||||
|
|
||||||
from ldaptool._utils.dninfo import DNInfo
|
|
||||||
|
|
||||||
|
|
||||||
class Step(abc.ABC):
|
|
||||||
__slots__ = ()
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def step(self, value: str) -> str:
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True)
|
|
||||||
class MaxLength(Step):
|
|
||||||
limit: int
|
|
||||||
|
|
||||||
def step(self, value: str) -> str:
|
|
||||||
if not self.limit or len(value) <= self.limit:
|
|
||||||
return value
|
|
||||||
return value[: self.limit - 1] + "…"
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True)
|
|
||||||
class DNDomain(Step):
|
|
||||||
def step(self, value: str) -> str:
|
|
||||||
try:
|
|
||||||
dninfo = DNInfo(dn=value)
|
|
||||||
except Exception:
|
|
||||||
# not a valid DN -> no processing
|
|
||||||
return value
|
|
||||||
return dninfo.domain
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True)
|
|
||||||
class DNPath(Step):
|
|
||||||
def step(self, value: str) -> str:
|
|
||||||
try:
|
|
||||||
dninfo = DNInfo(dn=value)
|
|
||||||
except Exception:
|
|
||||||
# not a valid DN -> no processing
|
|
||||||
return value
|
|
||||||
return dninfo.path
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True)
|
|
||||||
class DNFullPath(Step):
|
|
||||||
def step(self, value: str) -> str:
|
|
||||||
try:
|
|
||||||
dninfo = DNInfo(dn=value)
|
|
||||||
except Exception:
|
|
||||||
# not a valid DN -> no processing
|
|
||||||
return value
|
|
||||||
return dninfo.full_path
|
|
||||||
|
|
||||||
|
|
||||||
_STEPS = {
|
|
||||||
"domain": DNDomain(),
|
|
||||||
"path": DNPath(),
|
|
||||||
"fullpath": DNFullPath(),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class InvalidStep(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True)
|
|
||||||
class PostProcess:
|
|
||||||
steps: list[Step]
|
|
||||||
|
|
||||||
def process(self, value: str) -> str:
|
|
||||||
for step in self.steps:
|
|
||||||
value = step.step(value)
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
def parse_steps(steps: list[str]) -> PostProcess:
|
|
||||||
max_len = 0
|
|
||||||
try:
|
|
||||||
max_len = int(steps[-1])
|
|
||||||
steps.pop()
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
result = []
|
|
||||||
for step in steps:
|
|
||||||
step_i = _STEPS.get(step, None)
|
|
||||||
if step_i is None:
|
|
||||||
raise InvalidStep(f"Unknown post-processing step {step!r}")
|
|
||||||
result.append(step_i)
|
|
||||||
if max_len:
|
|
||||||
result.append(MaxLength(max_len))
|
|
||||||
return PostProcess(result)
|
|
@ -1,78 +1,47 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
|
||||||
from ldaptool._utils import argclasses
|
from ldaptool._utils import argclasses
|
||||||
|
|
||||||
from . import _postprocess
|
|
||||||
|
|
||||||
|
|
||||||
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
|
|
||||||
parser.add_argument(
|
|
||||||
metavar="attributes",
|
|
||||||
dest=dest,
|
|
||||||
nargs="*",
|
|
||||||
help="""
|
|
||||||
Attributes to lookup (and columns to display in tables).
|
|
||||||
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
|
|
||||||
""",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||||
class Arguments(argclasses.BaseArguments):
|
class Arguments(argclasses.BaseArguments):
|
||||||
columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
|
json: bool = dataclasses.field(
|
||||||
columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
|
default=False,
|
||||||
attributes: list[str] = dataclasses.field(default_factory=list)
|
metadata=argclasses.arg(help="Use full json output"),
|
||||||
|
)
|
||||||
|
human: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
|
||||||
|
)
|
||||||
human_separator: str = dataclasses.field(
|
human_separator: str = dataclasses.field(
|
||||||
default=", ",
|
default=", ",
|
||||||
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
|
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
|
||||||
)
|
)
|
||||||
|
|
||||||
dateonly: bool = dataclasses.field(
|
dateonly: bool = dataclasses.field(
|
||||||
default=True,
|
default=True,
|
||||||
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
|
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
|
||||||
)
|
)
|
||||||
|
dndomain: bool = dataclasses.field(
|
||||||
post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
|
default=False,
|
||||||
|
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
|
||||||
def __post_init__(self) -> None:
|
)
|
||||||
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
dnpath: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
# extract special attribute names
|
metadata=argclasses.arg(
|
||||||
all_attributes = False
|
help="""
|
||||||
attributes_set: set[str] = set()
|
Whether to export a virtual dnpath attribute
|
||||||
self.columns_keys = []
|
('/' joined values of reversed DN without DNS labels)
|
||||||
for column in list(self.columns):
|
"""
|
||||||
column = column.lower()
|
),
|
||||||
|
)
|
||||||
if column == "*":
|
dnfullpath: bool = dataclasses.field(
|
||||||
# '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
|
default=False,
|
||||||
self.columns.remove("*")
|
metadata=argclasses.arg(
|
||||||
all_attributes = True
|
help="""
|
||||||
continue
|
Whether to export a virtual dnfullpath attribute
|
||||||
|
('/' joined values of reversed DN; DNS domain as first label)
|
||||||
self.columns_keys.append(column)
|
"""
|
||||||
|
),
|
||||||
if column == "dndomain":
|
)
|
||||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"])
|
|
||||||
attributes_set.add("dn")
|
|
||||||
elif column == "dnpath":
|
|
||||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"])
|
|
||||||
attributes_set.add("dn")
|
|
||||||
elif column == "dnfullpath":
|
|
||||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"])
|
|
||||||
attributes_set.add("dn")
|
|
||||||
else:
|
|
||||||
step_names = column.split(":")
|
|
||||||
attributes_set.add(step_names[0])
|
|
||||||
if len(step_names) > 1:
|
|
||||||
source = step_names.pop(0)
|
|
||||||
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names)
|
|
||||||
|
|
||||||
if all_attributes:
|
|
||||||
self.attributes = []
|
|
||||||
else:
|
|
||||||
self.attributes = list(attributes_set)
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
import dataclasses
|
import dataclasses
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
@ -7,8 +8,28 @@ import ldaptool.decode.arguments
|
|||||||
from ldaptool._utils import argclasses
|
from ldaptool._utils import argclasses
|
||||||
|
|
||||||
|
|
||||||
|
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
|
||||||
|
parser.add_argument(
|
||||||
|
metavar="attributes",
|
||||||
|
dest=dest,
|
||||||
|
nargs="*",
|
||||||
|
help="""
|
||||||
|
Attributes to lookup (and columns to display in tables).
|
||||||
|
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||||
class Arguments(ldaptool.decode.arguments.Arguments):
|
class Arguments(ldaptool.decode.arguments.Arguments):
|
||||||
|
# overwrite fields for fake attributes to remove them from argparse;
|
||||||
|
# we enable those based on the attribute list
|
||||||
|
dndomain: bool = False
|
||||||
|
dnpath: bool = False
|
||||||
|
dnfullpath: bool = False
|
||||||
|
|
||||||
|
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
|
||||||
|
columns: list[str] = dataclasses.field(default_factory=list)
|
||||||
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
|
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
|
||||||
find: typing.Optional[str] = dataclasses.field(
|
find: typing.Optional[str] = dataclasses.field(
|
||||||
default=None,
|
default=None,
|
||||||
@ -54,8 +75,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
|
||||||
|
|
||||||
if not self.filter is None:
|
if not self.filter is None:
|
||||||
if not self.find is None:
|
if not self.find is None:
|
||||||
raise SystemExit("Can't use both --find and --filter")
|
raise SystemExit("Can't use both --find and --filter")
|
||||||
@ -67,3 +86,19 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
|||||||
else:
|
else:
|
||||||
# probably doesn't like empty filter?
|
# probably doesn't like empty filter?
|
||||||
self.filter = "(objectClass=*)"
|
self.filter = "(objectClass=*)"
|
||||||
|
|
||||||
|
# extract special attribute names
|
||||||
|
self.columns = self.attributes # use all names for columns (headings and their order)
|
||||||
|
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
|
||||||
|
# create fake attributes on demand
|
||||||
|
if attributes_set.pop("dndomain", ""):
|
||||||
|
self.dndomain = True
|
||||||
|
if attributes_set.pop("dnpath", ""):
|
||||||
|
self.dnpath = True
|
||||||
|
if attributes_set.pop("dnfullpath", ""):
|
||||||
|
self.dnfullpath = True
|
||||||
|
# store remaining attributes (with original case)
|
||||||
|
self.attributes = list(attributes_set.values())
|
||||||
|
if self.columns and not self.attributes:
|
||||||
|
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
|
||||||
|
self.attributes = ["dn"]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user