6 Commits

Author SHA1 Message Date
d9803c226e ldaptool 0.3-1 2023-04-28 19:32:12 +02:00
cbcdb36579 debian: require csvkit (markdown table is an essential feature) 2023-04-28 19:31:23 +02:00
54a23e8060 Merge tag 'ldaptool-0.3' into debian
ldaptool-0.3
2023-04-28 19:30:40 +02:00
d51d714352 ldaptool 0.2-1 2023-04-28 17:21:48 +02:00
474ee9383f Merge tag 'ldaptool-0.2' into debian
ldaptool-0.2
2023-04-28 17:21:18 +02:00
71ab3043f4 debian packaging 2023-04-28 14:41:50 +02:00
14 changed files with 237 additions and 243 deletions

29
debian/changelog vendored Normal file
View File

@ -0,0 +1,29 @@
ldaptool (0.3-1) unstable; urgency=medium
* ldaptool: move output arguments from search to main
* run sort internally, refactor table output into separate method
* refactor table variant handling
* add html output format
* README.md: document csvkit dependency
* debian: require csvkit (markdown table is an essential feature)
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 19:31:37 +0200
ldaptool (0.2-1) unstable; urgency=medium
* README.md: fix typo
* enable tls unless kerberos is used (SASL GSS-API doesn't seem to work over TLS)
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 17:21:35 +0200
ldaptool (0.1-1) unstable; urgency=medium
* Initial release.
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:30 +0200
ldaptool (0.1-0) unstable; urgency=medium
* Stub ITP lintian.
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:29 +0200

43
debian/control vendored Normal file
View File

@ -0,0 +1,43 @@
Source: ldaptool
Section: net
Priority: optional
Maintainer: Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
Rules-Requires-Root: no
Build-Depends:
debhelper-compat (= 13),
pybuild-plugin-pyproject,
flit,
dh-sequence-python3,
python3,
python3-ldap,
python3-yaml,
python3-pykeepass,
#Testsuite: autopkgtest-pkg-python
Standards-Version: 4.6.2
Homepage: https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool
Package: python3-ldaptool
Architecture: all
Depends:
${python3:Depends},
${misc:Depends},
Recommends:
python3-pykeepass,
Description: CLI tool to run ldap queries
CLI tool to query LDAP/AD servers, featuring various output formats
and a configuration for different realms.
.
This package installs the library for Python 3.
Package: ldaptool
Architecture: all
Depends:
python3-ldaptool (=${binary:Version}),
${python3:Depends},
${misc:Depends},
csvkit,
Description: CLI tool to run ldap queries
CLI tool to query LDAP/AD servers, featuring various output formats
and a configuration for different realms.
.
This package installs the script.

27
debian/copyright vendored Normal file
View File

@ -0,0 +1,27 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Source: <https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool>
Upstream-Name: ldaptool
Files:
*
Copyright:
2023 Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
2023 Daniel Dizdarevic <daniel.dizdarevic@tik.uni-stuttgart.de>
License: MIT
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
.
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

5
debian/gbp.conf vendored Normal file
View File

@ -0,0 +1,5 @@
[DEFAULT]
pristine-tar = False
upstream-branch = main
debian-branch = debian
upstream-tag = ldaptool-%(version)s

13
debian/rules vendored Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/make -f
export PYBUILD_NAME=ldaptool
%:
dh $@ --buildsystem=pybuild
# we want /usr/bin/ldaptool in a separate package
override_dh_auto_install:
dh_auto_install
mkdir -p debian/ldaptool/usr
mv debian/python3-ldaptool/usr/bin debian/ldaptool/usr/

1
debian/source/format vendored Normal file
View File

@ -0,0 +1 @@
3.0 (quilt)

1
debian/source/options vendored Normal file
View File

@ -0,0 +1 @@
extend-diff-ignore = "^[^/]*[.]egg-info/|^[.]vscode|/__pycache__/|^venv/|^.mypy_cache/"

View File

@ -14,7 +14,7 @@ from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded
class TableOutput(enum.Enum):
class TableOutput(enum.StrEnum):
MARKDOWN = "markdown"
CSV = "csv"
HTML = "html"
@ -42,27 +42,19 @@ class Arguments(search.Arguments):
help="Markdown table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
html: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="HTML table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use full json output"),
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
@ -106,10 +98,7 @@ class _Context:
self.config = search.Config.load()
except Exception as e:
raise SystemExit(f"config error: {e}")
try:
self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e:
raise SystemExit(f"invalid arguments: {e}")
def run(self) -> None:
# starting the search sets the base we want to print
@ -152,7 +141,7 @@ class _Context:
continue
# normal entry
assert not isinstance(entry, list)
obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
@ -212,13 +201,8 @@ class _Context:
# normal entry
assert not isinstance(entry, list)
num_entries += 1
if ldif_output:
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
elif self.arguments.human:
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
else:
assert self.arguments.json
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
obj = decoder.read(dn=dn, entry=entry)
decoder.emit(dn=dn, entry=obj)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")

View File

@ -74,9 +74,6 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
@dataclasses.dataclass(slots=True, kw_only=True)
class BaseArguments:
def __post_init__(self) -> None:
pass
@classmethod
def add_fields_to_parser(
cls: type[_TArgs],

View File

@ -1,12 +1,10 @@
from __future__ import annotations
from ._decoder import Attribute, Decoder
from ._postprocess import InvalidStep
from .arguments import Arguments
__all__ = [
"Arguments",
"Attribute",
"Decoder",
"InvalidStep",
]

View File

@ -8,6 +8,8 @@ import sys
import typing
import uuid
from ldaptool._utils.dninfo import DNInfo
from . import _types
from .arguments import Arguments
@ -120,16 +122,16 @@ class Attribute:
def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii")
def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
def print(self) -> None:
if not self.decoded is None:
comment = self.utf8_clean
if comment is None:
comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}", file=file)
print(f"{self.name}: {self.decoded} # {comment}")
elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}", file=file)
print(f"{self.name}: {self.utf8_clean}")
else:
print(f"{self.name}:: {self._base64_value}", file=file)
print(f"{self.name}:: {self._base64_value}")
def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {}
@ -173,69 +175,55 @@ class Decoder:
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items()
}
for attr, post_processes in self.arguments.post_process.items():
if attr == "dn":
values = [dn]
else:
attrs = decoded_entry.get(attr, None)
if attrs is None:
continue
values = [at.human() for at in attrs]
for column, post_process in post_processes.items():
decoded_entry[column] = [
Attribute.fake_attribute(column, post_process.process(value)) for value in values
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
dninfo = DNInfo(dn=dn)
if self.arguments.dndomain:
decoded_entry["dndomain"] = [
Attribute.fake_attribute("dndomain", dninfo.domain),
]
if self.arguments.dnpath:
decoded_entry["dnpath"] = [
Attribute.fake_attribute("dnpath", dninfo.path),
]
if self.arguments.dnfullpath:
decoded_entry["dnfullpath"] = [
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
]
return decoded_entry
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items():
for name, attrs in entry.items():
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit
def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.human(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items():
for name, attrs in entry.items():
emit[name] = [attr.to_json() for attr in attrs]
return emit
def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.json(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file)
attrs: typing.Optional[list[Attribute]]
if not self.arguments.attributes:
# show all attributes - use order from server
for attrs in obj.values():
for attr in attrs:
attr.print(file=file)
def _emit_json(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human:
emit = self.human(dn=dn, entry=entry)
else:
# only selected columns; use given order
for column in self.arguments.columns_keys:
if column == "dn":
continue # already printed dn
attrs = obj.get(column, None)
if attrs is None:
continue
for attr in attrs:
attr.print(file=file)
print(file=file) # separate entries with newlines
emit = self.json(dn=dn, entry=entry)
json.dump(emit, sys.stdout, ensure_ascii=False)
print() # terminate output dicts by newline
def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None:
print(f"dn: {dn}")
for attrs in entry.values():
for attr in attrs:
attr.print()
print() # separate entries with newlines
def emit(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human or self.arguments.json:
self._emit_json(dn=dn, entry=entry)
else:
self._emit_ldif(dn=dn, entry=entry)
def handle(self, *, dn: str, entry: TEntry) -> None:
entry_attrs = self.read(dn=dn, entry=entry)
self.emit(dn=dn, entry=entry_attrs)

View File

@ -1,96 +0,0 @@
from __future__ import annotations
import abc
import dataclasses
from ldaptool._utils.dninfo import DNInfo
class Step(abc.ABC):
__slots__ = ()
@abc.abstractmethod
def step(self, value: str) -> str:
...
@dataclasses.dataclass(slots=True)
class MaxLength(Step):
limit: int
def step(self, value: str) -> str:
if not self.limit or len(value) <= self.limit:
return value
return value[: self.limit - 1] + ""
@dataclasses.dataclass(slots=True)
class DNDomain(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.domain
@dataclasses.dataclass(slots=True)
class DNPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.path
@dataclasses.dataclass(slots=True)
class DNFullPath(Step):
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.full_path
_STEPS = {
"domain": DNDomain(),
"path": DNPath(),
"fullpath": DNFullPath(),
}
class InvalidStep(Exception):
pass
@dataclasses.dataclass(slots=True)
class PostProcess:
steps: list[Step]
def process(self, value: str) -> str:
for step in self.steps:
value = step.step(value)
return value
def parse_steps(steps: list[str]) -> PostProcess:
max_len = 0
try:
max_len = int(steps[-1])
steps.pop()
except ValueError:
pass
result = []
for step in steps:
step_i = _STEPS.get(step, None)
if step_i is None:
raise InvalidStep(f"Unknown post-processing step {step!r}")
result.append(step_i)
if max_len:
result.append(MaxLength(max_len))
return PostProcess(result)

View File

@ -1,78 +1,47 @@
from __future__ import annotations
import argparse
import dataclasses
from ldaptool._utils import argclasses
from . import _postprocess
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(argclasses.BaseArguments):
columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
attributes: list[str] = dataclasses.field(default_factory=list)
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use full json output"),
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
human_separator: str = dataclasses.field(
default=", ",
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
)
dateonly: bool = dataclasses.field(
default=True,
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
)
post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
# extract special attribute names
all_attributes = False
attributes_set: set[str] = set()
self.columns_keys = []
for column in list(self.columns):
column = column.lower()
if column == "*":
# '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
self.columns.remove("*")
all_attributes = True
continue
self.columns_keys.append(column)
if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"])
attributes_set.add("dn")
elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"])
attributes_set.add("dn")
elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"])
attributes_set.add("dn")
else:
step_names = column.split(":")
attributes_set.add(step_names[0])
if len(step_names) > 1:
source = step_names.pop(0)
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names)
if all_attributes:
self.attributes = []
else:
self.attributes = list(attributes_set)
dndomain: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
)
dnpath: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="""
Whether to export a virtual dnpath attribute
('/' joined values of reversed DN without DNS labels)
"""
),
)
dnfullpath: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="""
Whether to export a virtual dnfullpath attribute
('/' joined values of reversed DN; DNS domain as first label)
"""
),
)

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import argparse
import dataclasses
import typing
@ -7,8 +8,28 @@ import ldaptool.decode.arguments
from ldaptool._utils import argclasses
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(ldaptool.decode.arguments.Arguments):
# overwrite fields for fake attributes to remove them from argparse;
# we enable those based on the attribute list
dndomain: bool = False
dnpath: bool = False
dnfullpath: bool = False
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns: list[str] = dataclasses.field(default_factory=list)
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
find: typing.Optional[str] = dataclasses.field(
default=None,
@ -54,8 +75,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
if not self.filter is None:
if not self.find is None:
raise SystemExit("Can't use both --find and --filter")
@ -67,3 +86,19 @@ class Arguments(ldaptool.decode.arguments.Arguments):
else:
# probably doesn't like empty filter?
self.filter = "(objectClass=*)"
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
# create fake attributes on demand
if attributes_set.pop("dndomain", ""):
self.dndomain = True
if attributes_set.pop("dnpath", ""):
self.dnpath = True
if attributes_set.pop("dnfullpath", ""):
self.dnfullpath = True
# store remaining attributes (with original case)
self.attributes = list(attributes_set.values())
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]