14 Commits

10 changed files with 385 additions and 184 deletions

View File

@ -11,7 +11,8 @@ CLI tool to query LDAP/AD servers
* Classic LDIF * Classic LDIF
* JSON stream (with detailed or simplified attribute values) * JSON stream (with detailed or simplified attribute values)
* CSV * CSV
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts) * Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
* HTML
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl) * Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination * Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
* By default the first 1000 entries are shown, and it errors if there are more results * By default the first 1000 entries are shown, and it errors if there are more results

20
debian/changelog vendored
View File

@ -1,3 +1,23 @@
ldaptool (0.4-1) unstable; urgency=medium
* move argument/column handling to decoder (prepare for more post-processing in decoder)
* move json output format handling to main tool from decoder
* support attribute post-processing; :<len>, and DN :domain, :path, :fullpath
* use Enum instead of StrEnum for python3.10
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Tue, 02 May 2023 16:54:00 +0200
ldaptool (0.3-1) unstable; urgency=medium
* ldaptool: move output arguments from search to main
* run sort internally, refactor table output into separate method
* refactor table variant handling
* add html output format
* README.md: document csvkit dependency
* debian: require csvkit (markdown table is an essential feature)
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 19:31:37 +0200
ldaptool (0.2-1) unstable; urgency=medium ldaptool (0.2-1) unstable; urgency=medium
* README.md: fix typo * README.md: fix typo

1
debian/control vendored
View File

@ -35,6 +35,7 @@ Depends:
python3-ldaptool (=${binary:Version}), python3-ldaptool (=${binary:Version}),
${python3:Depends}, ${python3:Depends},
${misc:Depends}, ${misc:Depends},
csvkit,
Description: CLI tool to run ldap queries Description: CLI tool to run ldap queries
CLI tool to query LDAP/AD servers, featuring various output formats CLI tool to query LDAP/AD servers, featuring various output formats
and a configuration for different realms. and a configuration for different realms.

View File

@ -2,24 +2,114 @@ from __future__ import annotations
import argparse import argparse
import csv import csv
import dataclasses
import enum
import html
import subprocess import subprocess
import sys import sys
import typing import typing
from ldaptool import decode, search from ldaptool import decode, search
from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded from ldaptool._utils.ldap import Result, SizeLimitExceeded
class TableOutput(enum.Enum):
MARKDOWN = "markdown"
CSV = "csv"
HTML = "html"
def _html_escape_line(columns: typing.Sequence[str], *, cell: str = "td") -> str:
cell_s = f"<{cell}>"
cell_e = f"</{cell}>"
return "<tr>" + ("".join(cell_s + html.escape(col) + cell_e for col in columns)) + "</tr>\n"
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(search.Arguments):
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
html: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="HTML table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use full json output"),
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
# pick at most one in csv, (markdown) table, html
if [self.csv, self.table, self.html].count(True) > 1:
raise SystemExit("Can't use more than one table output type")
if self.csv:
self.table_output = TableOutput.CSV
elif self.table:
self.table_output = TableOutput.MARKDOWN
elif self.html:
self.table_output = TableOutput.HTML
if self.sort and self.table_output is None:
# default to markdown table
self.table_output = TableOutput.MARKDOWN
if self.table_output:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both table output and --json")
if self.human:
raise SystemExit("Can't use both table output and --human")
if self.raw:
if self.table_output:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")
class _Context: class _Context:
def __init__(self) -> None: def __init__(self) -> None:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
arguments_p = search.Arguments.add_to_parser(parser) arguments_p = Arguments.add_to_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
self.config = search.Config.load() self.config = search.Config.load()
except Exception as e: except Exception as e:
raise SystemExit(f"config error: {e}") raise SystemExit(f"config error: {e}")
self.arguments = arguments_p.from_args(args) try:
self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e:
raise SystemExit(f"invalid arguments: {e}")
def run(self) -> None: def run(self) -> None:
# starting the search sets the base we want to print # starting the search sets the base we want to print
@ -40,24 +130,60 @@ class _Context:
output = proc.stdin output = proc.stdin
try: try:
if self.arguments.table: if self.arguments.table_output == TableOutput.MARKDOWN:
add_filter(["csvlook"]) add_filter(["csvlook"])
if self.arguments.sort: if self.arguments.table_output:
add_filter(["csvsort", "--blanks"]) self._table_output(search_iterator, stream=output)
self._run_search(search_iterator, stream=output) else:
self._ldif_or_json_output(search_iterator, stream=output)
finally: finally:
if procs: if procs:
output.close() output.close()
for proc in reversed(procs): for proc in reversed(procs):
proc.wait() proc.wait()
def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None: def _to_table_lines(self, search_iterator: typing.Iterable[Result]) -> typing.Iterable[tuple[str, ...]]:
decoder = decode.Decoder(arguments=self.arguments)
# "human" (json) dicts contain data by lower case key:
column_keys = [col.lower() for col in self.arguments.columns]
try:
for dn, entry in search_iterator:
if dn is None:
continue
# normal entry
assert not isinstance(entry, list)
obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
def _table_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
line_iterator = self._to_table_lines(search_iterator)
if self.arguments.sort:
line_iterator = sorted(line_iterator)
if self.arguments.table_output in [TableOutput.CSV, TableOutput.MARKDOWN]:
csv_out = csv.writer(stream, lineterminator="\n")
csv_out.writerow(self.arguments.columns)
for line in line_iterator:
csv_out.writerow(line)
else:
assert self.arguments.table_output == TableOutput.HTML
stream.write("<table>\n")
stream.write(_html_escape_line(self.arguments.columns, cell="th"))
for line in line_iterator:
stream.write(_html_escape_line(line))
stream.write("</table>\n")
def _ldif_or_json_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
decoder = decode.Decoder(arguments=self.arguments) decoder = decode.Decoder(arguments=self.arguments)
num_responses = 0 num_responses = 0
num_entries = 0 num_entries = 0
ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human) ldif_output = not (self.arguments.json or self.arguments.human)
if ldif_output: if ldif_output:
print("# extended LDIF") print("# extended LDIF")
@ -72,22 +198,11 @@ class _Context:
print("#") print("#")
print() print()
if self.arguments.csv:
csv_out = csv.DictWriter(
stream,
fieldnames=self.arguments.columns,
lineterminator="\n",
extrasaction="ignore",
)
csv_out.writeheader()
# dicts contain data by lower case key
csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
try: try:
for dn, entry in search_iterator: for dn, entry in search_iterator:
num_responses += 1 num_responses += 1
if dn is None: if dn is None:
if not self.arguments.csv: if ldif_output:
print("# search reference") print("# search reference")
for ref in entry: for ref in entry:
assert isinstance(ref, str) assert isinstance(ref, str)
@ -97,11 +212,13 @@ class _Context:
# normal entry # normal entry
assert not isinstance(entry, list) assert not isinstance(entry, list)
num_entries += 1 num_entries += 1
obj = decoder.read(dn=dn, entry=entry) if ldif_output:
if self.arguments.csv: decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
csv_out.writerow(decoder.human(dn=dn, entry=obj)) elif self.arguments.human:
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
else: else:
decoder.emit(dn=dn, entry=obj) assert self.arguments.json
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")

View File

@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class BaseArguments: class BaseArguments:
def __post_init__(self) -> None:
pass
@classmethod @classmethod
def add_fields_to_parser( def add_fields_to_parser(
cls: type[_TArgs], cls: type[_TArgs],

View File

@ -1,10 +1,12 @@
from __future__ import annotations from __future__ import annotations
from ._decoder import Attribute, Decoder from ._decoder import Attribute, Decoder
from ._postprocess import InvalidStep
from .arguments import Arguments from .arguments import Arguments
__all__ = [ __all__ = [
"Arguments", "Arguments",
"Attribute", "Attribute",
"Decoder", "Decoder",
"InvalidStep",
] ]

View File

@ -8,8 +8,6 @@ import sys
import typing import typing
import uuid import uuid
from ldaptool._utils.dninfo import DNInfo
from . import _types from . import _types
from .arguments import Arguments from .arguments import Arguments
@ -122,16 +120,16 @@ class Attribute:
def _base64_value(self) -> str: def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii") return base64.b64encode(self.raw).decode("ascii")
def print(self) -> None: def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
if not self.decoded is None: if not self.decoded is None:
comment = self.utf8_clean comment = self.utf8_clean
if comment is None: if comment is None:
comment = self._base64_value comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}") print(f"{self.name}: {self.decoded} # {comment}", file=file)
elif not self.utf8_clean is None: elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}") print(f"{self.name}: {self.utf8_clean}", file=file)
else: else:
print(f"{self.name}:: {self._base64_value}") print(f"{self.name}:: {self._base64_value}", file=file)
def to_json(self) -> dict[str, typing.Any]: def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {} item: dict[str, typing.Any] = {}
@ -175,55 +173,69 @@ class Decoder:
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items() for name, raw_values in entry.items()
} }
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
dninfo = DNInfo(dn=dn) for attr, post_processes in self.arguments.post_process.items():
if self.arguments.dndomain: if attr == "dn":
decoded_entry["dndomain"] = [ values = [dn]
Attribute.fake_attribute("dndomain", dninfo.domain), else:
] attrs = decoded_entry.get(attr, None)
if self.arguments.dnpath: if attrs is None:
decoded_entry["dnpath"] = [ continue
Attribute.fake_attribute("dnpath", dninfo.path), values = [at.human() for at in attrs]
] for column, post_process in post_processes.items():
if self.arguments.dnfullpath: decoded_entry[column] = [
decoded_entry["dnfullpath"] = [ Attribute.fake_attribute(column, post_process.process(value)) for value in values
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
] ]
return decoded_entry return decoded_entry
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit return emit
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.human(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = [attr.to_json() for attr in attrs] emit[name] = [attr.to_json() for attr in attrs]
return emit return emit
def _emit_json(self, *, dn: str, entry: TDecoded) -> None: def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
if self.arguments.human: emit = self.json(dn=dn, obj=obj)
emit = self.human(dn=dn, entry=entry) json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file)
attrs: typing.Optional[list[Attribute]]
if not self.arguments.attributes:
# show all attributes - use order from server
for attrs in obj.values():
for attr in attrs:
attr.print(file=file)
else: else:
emit = self.json(dn=dn, entry=entry) # only selected columns; use given order
json.dump(emit, sys.stdout, ensure_ascii=False) for column in self.arguments.columns_keys:
print() # terminate output dicts by newline if column == "dn":
continue # already printed dn
attrs = obj.get(column, None)
if attrs is None:
continue
for attr in attrs:
attr.print(file=file)
print(file=file) # separate entries with newlines
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None: def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}") self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
for attrs in entry.values():
for attr in attrs:
attr.print()
print() # separate entries with newlines
def emit(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human or self.arguments.json:
self._emit_json(dn=dn, entry=entry)
else:
self._emit_ldif(dn=dn, entry=entry)
def handle(self, *, dn: str, entry: TEntry) -> None:
entry_attrs = self.read(dn=dn, entry=entry)
self.emit(dn=dn, entry=entry_attrs)

View File

@ -0,0 +1,96 @@
from __future__ import annotations
import abc
import dataclasses
from ldaptool._utils.dninfo import DNInfo
class Step(abc.ABC):
__slots__ = ()
@abc.abstractmethod
def step(self, value: str) -> str:
...
@dataclasses.dataclass(slots=True)
class MaxLength(Step):
limit: int
def step(self, value: str) -> str:
if not self.limit or len(value) <= self.limit:
return value
return value[: self.limit - 1] + ""
@dataclasses.dataclass(slots=True)
class DNDomain(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.domain
@dataclasses.dataclass(slots=True)
class DNPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.path
@dataclasses.dataclass(slots=True)
class DNFullPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.full_path
_STEPS = {
"domain": DNDomain(),
"path": DNPath(),
"fullpath": DNFullPath(),
}
class InvalidStep(Exception):
pass
@dataclasses.dataclass(slots=True)
class PostProcess:
steps: list[Step]
def process(self, value: str) -> str:
for step in self.steps:
value = step.step(value)
return value
def parse_steps(steps: list[str]) -> PostProcess:
max_len = 0
try:
max_len = int(steps[-1])
steps.pop()
except ValueError:
pass
result = []
for step in steps:
step_i = _STEPS.get(step, None)
if step_i is None:
raise InvalidStep(f"Unknown post-processing step {step!r}")
result.append(step_i)
if max_len:
result.append(MaxLength(max_len))
return PostProcess(result)

View File

@ -1,47 +1,78 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
from . import _postprocess
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(argclasses.BaseArguments): class Arguments(argclasses.BaseArguments):
json: bool = dataclasses.field( columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
default=False, columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
metadata=argclasses.arg(help="Use full json output"), attributes: list[str] = dataclasses.field(default_factory=list)
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
human_separator: str = dataclasses.field( human_separator: str = dataclasses.field(
default=", ", default=", ",
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
) )
dateonly: bool = dataclasses.field( dateonly: bool = dataclasses.field(
default=True, default=True,
metadata=argclasses.arg(help="Use only date part of decoded timestamps"), metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
) )
dndomain: bool = dataclasses.field(
default=False, post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
) def __post_init__(self) -> None:
dnpath: bool = dataclasses.field( super(Arguments, self).__post_init__() # super() not working here, unclear why.
default=False,
metadata=argclasses.arg( # extract special attribute names
help=""" all_attributes = False
Whether to export a virtual dnpath attribute attributes_set: set[str] = set()
('/' joined values of reversed DN without DNS labels) self.columns_keys = []
""" for column in list(self.columns):
), column = column.lower()
)
dnfullpath: bool = dataclasses.field( if column == "*":
default=False, # '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
metadata=argclasses.arg( self.columns.remove("*")
help=""" all_attributes = True
Whether to export a virtual dnfullpath attribute continue
('/' joined values of reversed DN; DNS domain as first label)
""" self.columns_keys.append(column)
),
) if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"])
attributes_set.add("dn")
elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"])
attributes_set.add("dn")
elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"])
attributes_set.add("dn")
else:
step_names = column.split(":")
attributes_set.add(step_names[0])
if len(step_names) > 1:
source = step_names.pop(0)
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names)
if all_attributes:
self.attributes = []
else:
self.attributes = list(attributes_set)

View File

@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
import typing import typing
@ -8,28 +7,8 @@ import ldaptool.decode.arguments
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(ldaptool.decode.arguments.Arguments): class Arguments(ldaptool.decode.arguments.Arguments):
# overwrite fields for fake attributes to remove them from argparse;
# we enable those based on the attribute list
dndomain: bool = False
dnpath: bool = False
dnfullpath: bool = False
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns: list[str] = dataclasses.field(default_factory=list)
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter")) filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
find: typing.Optional[str] = dataclasses.field( find: typing.Optional[str] = dataclasses.field(
default=None, default=None,
@ -44,10 +23,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
default=False, default=False,
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"), metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
) )
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in")) realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
server: typing.Optional[str] = dataclasses.field( server: typing.Optional[str] = dataclasses.field(
default=None, default=None,
@ -77,24 +52,10 @@ class Arguments(ldaptool.decode.arguments.Arguments):
help="Explicit search base (defaults to root of domain / forest with --gc)", help="Explicit search base (defaults to root of domain / forest with --gc)",
), ),
) )
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
def __post_init__(self) -> None: def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
if not self.filter is None: if not self.filter is None:
if not self.find is None: if not self.find is None:
raise SystemExit("Can't use both --find and --filter") raise SystemExit("Can't use both --find and --filter")
@ -106,46 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
else: else:
# probably doesn't like empty filter? # probably doesn't like empty filter?
self.filter = "(objectClass=*)" self.filter = "(objectClass=*)"
# can't print both csv and markdown
if self.csv and self.table:
raise SystemExit("Can't use both --table and --csv")
if self.sort:
if not self.table and not self.csv:
# default to markdown table
self.table = True
if self.table:
# markdown requires underlying csv
self.csv = True
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
# create fake attributes on demand
if attributes_set.pop("dndomain", ""):
self.dndomain = True
if attributes_set.pop("dnpath", ""):
self.dnpath = True
if attributes_set.pop("dnfullpath", ""):
self.dnfullpath = True
# store remaining attributes (with original case)
self.attributes = list(attributes_set.values())
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]
if self.csv:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both --table / --csv / --sort and --json")
if self.human:
raise SystemExit("Can't use both --table / --csv / --sort and --human")
if self.raw:
if self.csv:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")