12 Commits

14 changed files with 371 additions and 118 deletions

38
debian/changelog vendored Normal file
View File

@ -0,0 +1,38 @@
ldaptool (0.4-1) unstable; urgency=medium
* move argument/column handling to decoder (prepare for more post-processing in decoder)
* move json output format handling to main tool from decoder
* support attribute post-processing; :<len>, and DN :domain, :path, :fullpath
* use Enum instead of StrEnum for python3.10
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Tue, 02 May 2023 16:54:00 +0200
ldaptool (0.3-1) unstable; urgency=medium
* ldaptool: move output arguments from search to main
* run sort internally, refactor table output into separate method
* refactor table variant handling
* add html output format
* README.md: document csvkit dependency
* debian: require csvkit (markdown table is an essential feature)
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 19:31:37 +0200
ldaptool (0.2-1) unstable; urgency=medium
* README.md: fix typo
* enable tls unless kerberos is used (SASL GSS-API doesn't seem to work over TLS)
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 17:21:35 +0200
ldaptool (0.1-1) unstable; urgency=medium
* Initial release.
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:30 +0200
ldaptool (0.1-0) unstable; urgency=medium
* Stub ITP lintian.
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:29 +0200

43
debian/control vendored Normal file
View File

@ -0,0 +1,43 @@
Source: ldaptool
Section: net
Priority: optional
Maintainer: Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
Rules-Requires-Root: no
Build-Depends:
debhelper-compat (= 13),
pybuild-plugin-pyproject,
flit,
dh-sequence-python3,
python3,
python3-ldap,
python3-yaml,
python3-pykeepass,
#Testsuite: autopkgtest-pkg-python
Standards-Version: 4.6.2
Homepage: https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool
Package: python3-ldaptool
Architecture: all
Depends:
${python3:Depends},
${misc:Depends},
Recommends:
python3-pykeepass,
Description: CLI tool to run ldap queries
CLI tool to query LDAP/AD servers, featuring various output formats
and a configuration for different realms.
.
This package installs the library for Python 3.
Package: ldaptool
Architecture: all
Depends:
python3-ldaptool (=${binary:Version}),
${python3:Depends},
${misc:Depends},
csvkit,
Description: CLI tool to run ldap queries
CLI tool to query LDAP/AD servers, featuring various output formats
and a configuration for different realms.
.
This package installs the script.

27
debian/copyright vendored Normal file
View File

@ -0,0 +1,27 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Source: <https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool>
Upstream-Name: ldaptool
Files:
*
Copyright:
2023 Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
2023 Daniel Dizdarevic <daniel.dizdarevic@tik.uni-stuttgart.de>
License: MIT
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
.
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

5
debian/gbp.conf vendored Normal file
View File

@ -0,0 +1,5 @@
[DEFAULT]
pristine-tar = False
upstream-branch = main
debian-branch = debian
upstream-tag = ldaptool-%(version)s

13
debian/rules vendored Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/make -f
export PYBUILD_NAME=ldaptool
%:
dh $@ --buildsystem=pybuild
# we want /usr/bin/ldaptool in a separate package
override_dh_auto_install:
dh_auto_install
mkdir -p debian/ldaptool/usr
mv debian/python3-ldaptool/usr/bin debian/ldaptool/usr/

1
debian/source/format vendored Normal file
View File

@ -0,0 +1 @@
3.0 (quilt)

1
debian/source/options vendored Normal file
View File

@ -0,0 +1 @@
extend-diff-ignore = "^[^/]*[.]egg-info/|^[.]vscode|/__pycache__/|^venv/|^.mypy_cache/"

View File

@ -14,7 +14,7 @@ from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded from ldaptool._utils.ldap import Result, SizeLimitExceeded
class TableOutput(enum.StrEnum): class TableOutput(enum.Enum):
MARKDOWN = "markdown" MARKDOWN = "markdown"
CSV = "csv" CSV = "csv"
HTML = "html" HTML = "html"
@ -42,19 +42,27 @@ class Arguments(search.Arguments):
help="Markdown table output - requires list of attributes", help="Markdown table output - requires list of attributes",
), ),
) )
table_output: typing.Optional[TableOutput] = None
html: bool = dataclasses.field( html: bool = dataclasses.field(
default=False, default=False,
metadata=argclasses.arg( metadata=argclasses.arg(
help="HTML table output - requires list of attributes", help="HTML table output - requires list of attributes",
), ),
) )
table_output: typing.Optional[TableOutput] = None
sort: bool = dataclasses.field( sort: bool = dataclasses.field(
default=False, default=False,
metadata=argclasses.arg( metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given", help="Sorted table output - defaults to markdown --table unless --csv is given",
), ),
) )
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use full json output"),
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
def __post_init__(self) -> None: def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why. super(Arguments, self).__post_init__() # super() not working here, unclear why.
@ -98,7 +106,10 @@ class _Context:
self.config = search.Config.load() self.config = search.Config.load()
except Exception as e: except Exception as e:
raise SystemExit(f"config error: {e}") raise SystemExit(f"config error: {e}")
try:
self.arguments = arguments_p.from_args(args) self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e:
raise SystemExit(f"invalid arguments: {e}")
def run(self) -> None: def run(self) -> None:
# starting the search sets the base we want to print # starting the search sets the base we want to print
@ -141,7 +152,7 @@ class _Context:
continue continue
# normal entry # normal entry
assert not isinstance(entry, list) assert not isinstance(entry, list)
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry)) obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys) yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")
@ -201,8 +212,13 @@ class _Context:
# normal entry # normal entry
assert not isinstance(entry, list) assert not isinstance(entry, list)
num_entries += 1 num_entries += 1
obj = decoder.read(dn=dn, entry=entry) if ldif_output:
decoder.emit(dn=dn, entry=obj) decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
elif self.arguments.human:
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
else:
assert self.arguments.json
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")

View File

@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class BaseArguments: class BaseArguments:
def __post_init__(self) -> None:
pass
@classmethod @classmethod
def add_fields_to_parser( def add_fields_to_parser(
cls: type[_TArgs], cls: type[_TArgs],

View File

@ -1,10 +1,12 @@
from __future__ import annotations from __future__ import annotations
from ._decoder import Attribute, Decoder from ._decoder import Attribute, Decoder
from ._postprocess import InvalidStep
from .arguments import Arguments from .arguments import Arguments
__all__ = [ __all__ = [
"Arguments", "Arguments",
"Attribute", "Attribute",
"Decoder", "Decoder",
"InvalidStep",
] ]

View File

@ -8,8 +8,6 @@ import sys
import typing import typing
import uuid import uuid
from ldaptool._utils.dninfo import DNInfo
from . import _types from . import _types
from .arguments import Arguments from .arguments import Arguments
@ -122,16 +120,16 @@ class Attribute:
def _base64_value(self) -> str: def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii") return base64.b64encode(self.raw).decode("ascii")
def print(self) -> None: def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
if not self.decoded is None: if not self.decoded is None:
comment = self.utf8_clean comment = self.utf8_clean
if comment is None: if comment is None:
comment = self._base64_value comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}") print(f"{self.name}: {self.decoded} # {comment}", file=file)
elif not self.utf8_clean is None: elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}") print(f"{self.name}: {self.utf8_clean}", file=file)
else: else:
print(f"{self.name}:: {self._base64_value}") print(f"{self.name}:: {self._base64_value}", file=file)
def to_json(self) -> dict[str, typing.Any]: def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {} item: dict[str, typing.Any] = {}
@ -175,55 +173,69 @@ class Decoder:
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items() for name, raw_values in entry.items()
} }
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
dninfo = DNInfo(dn=dn) for attr, post_processes in self.arguments.post_process.items():
if self.arguments.dndomain: if attr == "dn":
decoded_entry["dndomain"] = [ values = [dn]
Attribute.fake_attribute("dndomain", dninfo.domain), else:
] attrs = decoded_entry.get(attr, None)
if self.arguments.dnpath: if attrs is None:
decoded_entry["dnpath"] = [ continue
Attribute.fake_attribute("dnpath", dninfo.path), values = [at.human() for at in attrs]
] for column, post_process in post_processes.items():
if self.arguments.dnfullpath: decoded_entry[column] = [
decoded_entry["dnfullpath"] = [ Attribute.fake_attribute(column, post_process.process(value)) for value in values
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
] ]
return decoded_entry return decoded_entry
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit return emit
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.human(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = [attr.to_json() for attr in attrs] emit[name] = [attr.to_json() for attr in attrs]
return emit return emit
def _emit_json(self, *, dn: str, entry: TDecoded) -> None: def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
if self.arguments.human: emit = self.json(dn=dn, obj=obj)
emit = self.human(dn=dn, entry=entry) json.dump(emit, file, ensure_ascii=False)
else: print(file=file) # terminate output dicts by newline
emit = self.json(dn=dn, entry=entry)
json.dump(emit, sys.stdout, ensure_ascii=False)
print() # terminate output dicts by newline
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None: def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}") self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
for attrs in entry.values():
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file)
attrs: typing.Optional[list[Attribute]]
if not self.arguments.attributes:
# show all attributes - use order from server
for attrs in obj.values():
for attr in attrs: for attr in attrs:
attr.print() attr.print(file=file)
print() # separate entries with newlines
def emit(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human or self.arguments.json:
self._emit_json(dn=dn, entry=entry)
else: else:
self._emit_ldif(dn=dn, entry=entry) # only selected columns; use given order
for column in self.arguments.columns_keys:
if column == "dn":
continue # already printed dn
attrs = obj.get(column, None)
if attrs is None:
continue
for attr in attrs:
attr.print(file=file)
print(file=file) # separate entries with newlines
def handle(self, *, dn: str, entry: TEntry) -> None: def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
entry_attrs = self.read(dn=dn, entry=entry) self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
self.emit(dn=dn, entry=entry_attrs)

View File

@ -0,0 +1,96 @@
from __future__ import annotations
import abc
import dataclasses
from ldaptool._utils.dninfo import DNInfo
class Step(abc.ABC):
__slots__ = ()
@abc.abstractmethod
def step(self, value: str) -> str:
...
@dataclasses.dataclass(slots=True)
class MaxLength(Step):
limit: int
def step(self, value: str) -> str:
if not self.limit or len(value) <= self.limit:
return value
return value[: self.limit - 1] + ""
@dataclasses.dataclass(slots=True)
class DNDomain(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.domain
@dataclasses.dataclass(slots=True)
class DNPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.path
@dataclasses.dataclass(slots=True)
class DNFullPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.full_path
_STEPS = {
"domain": DNDomain(),
"path": DNPath(),
"fullpath": DNFullPath(),
}
class InvalidStep(Exception):
pass
@dataclasses.dataclass(slots=True)
class PostProcess:
steps: list[Step]
def process(self, value: str) -> str:
for step in self.steps:
value = step.step(value)
return value
def parse_steps(steps: list[str]) -> PostProcess:
max_len = 0
try:
max_len = int(steps[-1])
steps.pop()
except ValueError:
pass
result = []
for step in steps:
step_i = _STEPS.get(step, None)
if step_i is None:
raise InvalidStep(f"Unknown post-processing step {step!r}")
result.append(step_i)
if max_len:
result.append(MaxLength(max_len))
return PostProcess(result)

View File

@ -1,47 +1,78 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
from . import _postprocess
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(argclasses.BaseArguments): class Arguments(argclasses.BaseArguments):
json: bool = dataclasses.field( columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
default=False, columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
metadata=argclasses.arg(help="Use full json output"), attributes: list[str] = dataclasses.field(default_factory=list)
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
human_separator: str = dataclasses.field( human_separator: str = dataclasses.field(
default=", ", default=", ",
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
) )
dateonly: bool = dataclasses.field( dateonly: bool = dataclasses.field(
default=True, default=True,
metadata=argclasses.arg(help="Use only date part of decoded timestamps"), metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
) )
dndomain: bool = dataclasses.field(
default=False, post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
) def __post_init__(self) -> None:
dnpath: bool = dataclasses.field( super(Arguments, self).__post_init__() # super() not working here, unclear why.
default=False,
metadata=argclasses.arg( # extract special attribute names
help=""" all_attributes = False
Whether to export a virtual dnpath attribute attributes_set: set[str] = set()
('/' joined values of reversed DN without DNS labels) self.columns_keys = []
""" for column in list(self.columns):
), column = column.lower()
)
dnfullpath: bool = dataclasses.field( if column == "*":
default=False, # '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
metadata=argclasses.arg( self.columns.remove("*")
help=""" all_attributes = True
Whether to export a virtual dnfullpath attribute continue
('/' joined values of reversed DN; DNS domain as first label)
""" self.columns_keys.append(column)
),
) if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"])
attributes_set.add("dn")
elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"])
attributes_set.add("dn")
elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"])
attributes_set.add("dn")
else:
step_names = column.split(":")
attributes_set.add(step_names[0])
if len(step_names) > 1:
source = step_names.pop(0)
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names)
if all_attributes:
self.attributes = []
else:
self.attributes = list(attributes_set)

View File

@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
import typing import typing
@ -8,28 +7,8 @@ import ldaptool.decode.arguments
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(ldaptool.decode.arguments.Arguments): class Arguments(ldaptool.decode.arguments.Arguments):
# overwrite fields for fake attributes to remove them from argparse;
# we enable those based on the attribute list
dndomain: bool = False
dnpath: bool = False
dnfullpath: bool = False
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns: list[str] = dataclasses.field(default_factory=list)
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter")) filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
find: typing.Optional[str] = dataclasses.field( find: typing.Optional[str] = dataclasses.field(
default=None, default=None,
@ -75,6 +54,8 @@ class Arguments(ldaptool.decode.arguments.Arguments):
) )
def __post_init__(self) -> None: def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
if not self.filter is None: if not self.filter is None:
if not self.find is None: if not self.find is None:
raise SystemExit("Can't use both --find and --filter") raise SystemExit("Can't use both --find and --filter")
@ -86,19 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
else: else:
# probably doesn't like empty filter? # probably doesn't like empty filter?
self.filter = "(objectClass=*)" self.filter = "(objectClass=*)"
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
# create fake attributes on demand
if attributes_set.pop("dndomain", ""):
self.dndomain = True
if attributes_set.pop("dnpath", ""):
self.dnpath = True
if attributes_set.pop("dnfullpath", ""):
self.dnfullpath = True
# store remaining attributes (with original case)
self.attributes = list(attributes_set.values())
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]