12 Commits

17 changed files with 276 additions and 208 deletions

View File

@ -9,7 +9,7 @@ CLI tool to query LDAP/AD servers
* Integration with password managers * Integration with password managers
* Various output formats * Various output formats
* Classic LDIF * Classic LDIF
* JSON stream (with detailed or simplified attribute values) * JSON stream (with simplified or detailed attribute values)
* CSV * CSV
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/) * Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
* HTML * HTML
@ -18,6 +18,34 @@ CLI tool to query LDAP/AD servers
* By default the first 1000 entries are shown, and it errors if there are more results * By default the first 1000 entries are shown, and it errors if there are more results
* Use `--all` to show all results * Use `--all` to show all results
## Virtual attributes
`ldaptool` supports constructing new values from existing attributes by adding a `:<postprocess>` suffix (which can be chained apart from the length limit).
* Some suffixes support an argument as `:<postprocess>[<arg>]`.
* A single integer as postprocess suffix limits the length of the value; it replaces the last character of the output with `…` if it cut something off.
* Multi-valued attributes generate multiple virtual attrites; each value is processed individually. (The values are joined afterwards for table output if needed.)
### DN handling
DNs are decoded into lists of lists of `(name, value)` pairs (the inner list usually contains exactly one entry).
Attributes with a `DC` name are considered part of the "domain", everything else belongs to the "path".
(Usually a DN will start with path segments and end with domain segments.)
The path is read from back to front.
The following postprocess hooks are available:
* `domain`: extracts the domain as DNS FQDN (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com`)
* `path`: extracts the non-domain parts without names and separates them by `/` (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `Dep1/Someone`)
* `fullpath`: uses the `domain` as first segment in a path (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com/Dep1/Someone`)
* `dnslice`: extracts a "slice" from a DN (outer list only); the result is still in DN format.
`path`, `fullpath` and `dnslice` take an optional index/slice as argument, written in python syntax.
For `path` and `fullpath` this extracts only the given index/slice from the path (`fullpath` always includes the full FQDN as first segment), `dnslice` operates on the outer list of decoded (lists of) pairs:
* `dn:dnslice[1:]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `OU=Dep1,DC=example,DC=com`
* `dn:fullpath[:-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `example.com/Dep1`
* `dn:path[-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `Someone`
## Authentication, Protocol, Ports ## Authentication, Protocol, Ports
`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones. `ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones.

38
debian/changelog vendored
View File

@ -1,38 +0,0 @@
ldaptool (0.4-1) unstable; urgency=medium
* move argument/column handling to decoder (prepare for more post-processing in decoder)
* move json output format handling to main tool from decoder
* support attribute post-processing; :<len>, and DN :domain, :path, :fullpath
* use Enum instead of StrEnum for python3.10
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Tue, 02 May 2023 16:54:00 +0200
ldaptool (0.3-1) unstable; urgency=medium
* ldaptool: move output arguments from search to main
* run sort internally, refactor table output into separate method
* refactor table variant handling
* add html output format
* README.md: document csvkit dependency
* debian: require csvkit (markdown table is an essential feature)
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 19:31:37 +0200
ldaptool (0.2-1) unstable; urgency=medium
* README.md: fix typo
* enable tls unless kerberos is used (SASL GSS-API doesn't seem to work over TLS)
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 17:21:35 +0200
ldaptool (0.1-1) unstable; urgency=medium
* Initial release.
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:30 +0200
ldaptool (0.1-0) unstable; urgency=medium
* Stub ITP lintian.
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:29 +0200

43
debian/control vendored
View File

@ -1,43 +0,0 @@
Source: ldaptool
Section: net
Priority: optional
Maintainer: Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
Rules-Requires-Root: no
Build-Depends:
debhelper-compat (= 13),
pybuild-plugin-pyproject,
flit,
dh-sequence-python3,
python3,
python3-ldap,
python3-yaml,
python3-pykeepass,
#Testsuite: autopkgtest-pkg-python
Standards-Version: 4.6.2
Homepage: https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool
Package: python3-ldaptool
Architecture: all
Depends:
${python3:Depends},
${misc:Depends},
Recommends:
python3-pykeepass,
Description: CLI tool to run ldap queries
CLI tool to query LDAP/AD servers, featuring various output formats
and a configuration for different realms.
.
This package installs the library for Python 3.
Package: ldaptool
Architecture: all
Depends:
python3-ldaptool (=${binary:Version}),
${python3:Depends},
${misc:Depends},
csvkit,
Description: CLI tool to run ldap queries
CLI tool to query LDAP/AD servers, featuring various output formats
and a configuration for different realms.
.
This package installs the script.

27
debian/copyright vendored
View File

@ -1,27 +0,0 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Source: <https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool>
Upstream-Name: ldaptool
Files:
*
Copyright:
2023 Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
2023 Daniel Dizdarevic <daniel.dizdarevic@tik.uni-stuttgart.de>
License: MIT
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
.
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

5
debian/gbp.conf vendored
View File

@ -1,5 +0,0 @@
[DEFAULT]
pristine-tar = False
upstream-branch = main
debian-branch = debian
upstream-tag = ldaptool-%(version)s

13
debian/rules vendored
View File

@ -1,13 +0,0 @@
#!/usr/bin/make -f
export PYBUILD_NAME=ldaptool
%:
dh $@ --buildsystem=pybuild
# we want /usr/bin/ldaptool in a separate package
override_dh_auto_install:
dh_auto_install
mkdir -p debian/ldaptool/usr
mv debian/python3-ldaptool/usr/bin debian/ldaptool/usr/

View File

@ -1 +0,0 @@
3.0 (quilt)

View File

@ -1 +0,0 @@
extend-diff-ignore = "^[^/]*[.]egg-info/|^[.]vscode|/__pycache__/|^venv/|^.mypy_cache/"

View File

@ -16,7 +16,7 @@ classifiers = [
] ]
dynamic = ["version", "description"] dynamic = ["version", "description"]
requires-python = "~=3.11" requires-python = "~=3.10"
dependencies = [ dependencies = [
"python-ldap", "python-ldap",
"PyYAML", "PyYAML",

View File

@ -55,13 +55,17 @@ class Arguments(search.Arguments):
help="Sorted table output - defaults to markdown --table unless --csv is given", help="Sorted table output - defaults to markdown --table unless --csv is given",
), ),
) )
full_json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Use full json output (dn as str, attributes as list of dicts containing various represenatations)",
),
)
json: bool = dataclasses.field( json: bool = dataclasses.field(
default=False, default=False,
metadata=argclasses.arg(help="Use full json output"), metadata=argclasses.arg(
) help="Use simple json output (dn as str, attributes map to list of human-readable strings)",
human: bool = dataclasses.field( ),
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
) )
def __post_init__(self) -> None: def __post_init__(self) -> None:
@ -85,15 +89,15 @@ class Arguments(search.Arguments):
if self.table_output: if self.table_output:
if not self.columns: if not self.columns:
raise SystemExit("Table output requires attributes") raise SystemExit("Table output requires attributes")
if self.json: if self.full_json:
raise SystemExit("Can't use both table output and --json") raise SystemExit("Can't use both table output and --json")
if self.human: if self.json:
raise SystemExit("Can't use both table output and --human") raise SystemExit("Can't use both table output and --human")
if self.raw: if self.raw:
if self.table_output: if self.table_output:
raise SystemExit("Table output requires decode; --raw not allowed") raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human: if self.full_json or self.json:
raise SystemExit("Decode options require decode; --raw not allowed") raise SystemExit("Decode options require decode; --raw not allowed")
@ -105,7 +109,7 @@ class _Context:
try: try:
self.config = search.Config.load() self.config = search.Config.load()
except Exception as e: except Exception as e:
raise SystemExit(f"config error: {e}") raise SystemExit(f"config error: {e!r}")
try: try:
self.arguments = arguments_p.from_args(args) self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e: except decode.InvalidStep as e:
@ -183,7 +187,7 @@ class _Context:
num_responses = 0 num_responses = 0
num_entries = 0 num_entries = 0
ldif_output = not (self.arguments.json or self.arguments.human) ldif_output = not (self.arguments.full_json or self.arguments.json)
if ldif_output: if ldif_output:
print("# extended LDIF") print("# extended LDIF")
@ -214,11 +218,11 @@ class _Context:
num_entries += 1 num_entries += 1
if ldif_output: if ldif_output:
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream) decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
elif self.arguments.human: elif self.arguments.json:
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream) decoder.read_and_emit_simple_json(dn=dn, entry=entry, file=stream)
else: else:
assert self.arguments.json assert self.arguments.full_json
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream) decoder.read_and_emit_full_json(dn=dn, entry=entry, file=stream)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")

View File

@ -33,19 +33,26 @@ class DNInfo:
def domain(self) -> str: def domain(self) -> str:
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc") return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str: def _path(self, *, escape: typing.Callable[[str], str], sep: str, selection: slice = slice(None)) -> str:
return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc") rev_flattened = [ava[1] for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc"]
return sep.join(value for value in rev_flattened[selection])
def sliced_path(self, selection: slice, /) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/", selection=selection)
@functools.cached_property @functools.cached_property
def path(self) -> str: def path(self) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/") return self.sliced_path(slice(None))
@property def sliced_full_path(self, selection: slice, /) -> str:
def full_path(self) -> str:
domain = self.domain domain = self.domain
path = self.path path = self.sliced_path(selection)
if not path: if not path:
return self.domain return self.domain
if not domain: if not domain:
return self.path return self.path
return f"{domain}/{path}" return f"{domain}/{path}"
@property
def full_path(self) -> str:
return self.sliced_full_path(slice(None))

View File

@ -100,8 +100,15 @@ class Attribute:
except Exception: except Exception:
return return
def _try_decode_grouptype(self) -> None:
if self.utf8_clean:
try:
self.decoded = _types.grouptype.parse(self.utf8_clean.strip())
except Exception:
return
def _try_decode(self, args: Arguments) -> None: def _try_decode(self, args: Arguments) -> None:
if self.name in ("objectSid",): if self.name in ("objectSid", "securityIdentifier"):
self._try_decode_sid() self._try_decode_sid()
elif self.name in ("msExchMailboxGuid", "objectGUID"): elif self.name in ("msExchMailboxGuid", "objectGUID"):
self._try_decode_uuid() self._try_decode_uuid()
@ -115,6 +122,8 @@ class Attribute:
self._try_decode_timestamp(args) self._try_decode_timestamp(args)
elif self.name == "userAccountControl": elif self.name == "userAccountControl":
self._try_decode_uac() self._try_decode_uac()
elif self.name == "groupType":
self._try_decode_grouptype()
@property @property
def _base64_value(self) -> str: def _base64_value(self) -> str:
@ -195,27 +204,33 @@ class Decoder:
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit return emit
def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: def simple_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit = self.human(dn=dn, obj=obj) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items():
emit[name] = [attr.human() for attr in attrs]
return emit
def emit_simple_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.simple_json(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False) json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline print(file=file) # terminate output dicts by newline
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: def read_and_emit_simple_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) self.emit_simple_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]: def full_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items(): for name, attrs in obj.items():
emit[name] = [attr.to_json() for attr in attrs] emit[name] = [attr.to_json() for attr in attrs]
return emit return emit
def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: def emit_full_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.json(dn=dn, obj=obj) emit = self.full_json(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False) json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline print(file=file) # terminate output dicts by newline
def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None: def read_and_emit_full_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file) self.emit_full_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None: def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file) print(f"dn: {dn}", file=file)

View File

@ -2,6 +2,9 @@ from __future__ import annotations
import abc import abc
import dataclasses import dataclasses
import typing
import ldap.dn
from ldaptool._utils.dninfo import DNInfo from ldaptool._utils.dninfo import DNInfo
@ -14,6 +17,27 @@ class Step(abc.ABC):
... ...
def _args_to_slice(args: str) -> slice:
args = args.strip()
if not args:
return slice(None)
params: list[typing.Optional[int]] = []
for arg in args.split(":"):
arg = arg.strip()
if arg:
params.append(int(arg))
else:
params.append(None)
if len(params) == 1:
assert isinstance(params[0], int)
ndx = params[0]
if ndx == -1:
return slice(ndx, None) # from last element to end - still exactly one element
# this doesn't work for ndx == -1: slice(-1, 0) is always empty. otherwise it should return [ndx:][:1].
return slice(ndx, ndx + 1)
return slice(*params)
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class MaxLength(Step): class MaxLength(Step):
limit: int limit: int
@ -26,6 +50,10 @@ class MaxLength(Step):
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class DNDomain(Step): class DNDomain(Step):
def __init__(self, args: str) -> None:
if args:
raise ValueError(":domain doesn't support an argument")
def step(self, value: str) -> str: def step(self, value: str) -> str:
try: try:
dninfo = DNInfo(dn=value) dninfo = DNInfo(dn=value)
@ -37,30 +65,57 @@ class DNDomain(Step):
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class DNPath(Step): class DNPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str: def step(self, value: str) -> str:
try: try:
dninfo = DNInfo(dn=value) dninfo = DNInfo(dn=value)
except Exception: except Exception:
# not a valid DN -> no processing # not a valid DN -> no processing
return value return value
return dninfo.path return dninfo.sliced_path(self.path_slice)
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class DNFullPath(Step): class DNFullPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str: def step(self, value: str) -> str:
try: try:
dninfo = DNInfo(dn=value) dninfo = DNInfo(dn=value)
except Exception: except Exception:
# not a valid DN -> no processing # not a valid DN -> no processing
return value return value
return dninfo.full_path return dninfo.sliced_full_path(self.path_slice)
_STEPS = { @dataclasses.dataclass(slots=True)
"domain": DNDomain(), class DNSlice(Step):
"path": DNPath(), slice: slice
"fullpath": DNFullPath(),
def __init__(self, args: str) -> None:
self.slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return ldap.dn.dn2str(dninfo.parts[self.slice]) # type: ignore
_STEPS: dict[str, typing.Callable[[str], Step]] = {
"domain": DNDomain,
"path": DNPath,
"fullpath": DNFullPath,
"dnslice": DNSlice,
} }
@ -78,19 +133,63 @@ class PostProcess:
return value return value
def parse_steps(steps: list[str]) -> PostProcess: def parse_steps(steps: str) -> PostProcess:
max_len = 0 result: list[Step] = []
try:
max_len = int(steps[-1]) cur_id_start = 0
steps.pop() cur_args_start = -1
except ValueError: current_id = ""
pass current_args = ""
result = [] count_brackets = 0
for step in steps: step_done = False
step_i = _STEPS.get(step, None)
def handle_step() -> None:
nonlocal cur_id_start, cur_args_start, current_id, current_args, step_done
assert step_done
step_i = _STEPS.get(current_id, None)
if step_i is None: if step_i is None:
raise InvalidStep(f"Unknown post-processing step {step!r}") try:
result.append(step_i) max_len = int(current_id)
if max_len:
result.append(MaxLength(max_len)) result.append(MaxLength(max_len))
except ValueError:
raise InvalidStep(f"Unknown post-processing step {current_id!r}")
else:
result.append(step_i(current_args))
cur_id_start = pos + 1
cur_args_start = -1
current_id = ""
current_args = ""
step_done = False
for pos, char in enumerate(steps):
if step_done:
if char != ":":
raise InvalidStep(f"Require : after step, found {char!r} at pos {pos}")
handle_step()
elif char == "[":
if count_brackets == 0:
# end of identifier
current_id = steps[cur_id_start:pos]
cur_args_start = pos + 1
count_brackets += 1
elif char == "]":
count_brackets -= 1
if count_brackets == 0:
current_args = steps[cur_args_start:pos]
step_done = True
elif count_brackets:
continue
elif not char.isalnum():
raise InvalidStep(f"Expecting either alphanumeric, ':' or '[', got {char!r} at {pos}")
if not step_done:
current_id = steps[cur_id_start:]
if current_id:
step_done = True
if step_done:
handle_step()
return PostProcess(result) return PostProcess(result)

View File

@ -1,8 +1,9 @@
from __future__ import annotations from __future__ import annotations
from . import sid, timestamp, uac from . import grouptype, sid, timestamp, uac
__all__ = [ __all__ = [
"grouptype",
"sid", "sid",
"timestamp", "timestamp",
"uac", "uac",

View File

@ -0,0 +1,29 @@
from __future__ import annotations
import enum
import typing
class GroupTypeFlags(enum.IntFlag):
SYSTEM = 0x00000001
SCOPE_GLOBAL = 0x00000002
SCOPE_DOMAIN = 0x00000004
SCOPE_UNIVERSAL = 0x00000008
APP_BASIC = 0x00000010
APP_QUERY = 0x00000020
SECURITY = 0x80000000 # otherwise distribution
def flags(self) -> list[GroupTypeFlags]:
# ignore "uncovered" bits for now
value = self.value
members = []
for member in GroupTypeFlags:
member_value = member.value
if member_value and member_value & value == member_value:
members.append(member)
return members
def parse(value: str) -> str:
members = GroupTypeFlags(int(value)).flags()
return ", ".join(typing.cast(str, member.name) for member in members)

View File

@ -57,20 +57,20 @@ class Arguments(argclasses.BaseArguments):
self.columns_keys.append(column) self.columns_keys.append(column)
if column == "dndomain": if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"]) self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("domain")
attributes_set.add("dn") attributes_set.add("dn")
elif column == "dnpath": elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"]) self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("path")
attributes_set.add("dn") attributes_set.add("dn")
elif column == "dnfullpath": elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"]) self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("fullpath")
attributes_set.add("dn") attributes_set.add("dn")
else: else:
step_names = column.split(":") col_parts = column.split(":", maxsplit=1)
attributes_set.add(step_names[0]) attributes_set.add(col_parts[0])
if len(step_names) > 1: if len(col_parts) == 2:
source = step_names.pop(0) source, steps = col_parts
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names) self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(steps)
if all_attributes: if all_attributes:
self.attributes = [] self.attributes = []

View File

@ -7,6 +7,7 @@ import os
import os.path import os.path
import shlex import shlex
import subprocess import subprocess
import sys
import typing import typing
import yaml import yaml
@ -28,13 +29,13 @@ class Realm:
@staticmethod @staticmethod
def load(name: str, data: typing.Any) -> Realm: def load(name: str, data: typing.Any) -> Realm:
assert isinstance(data, dict) assert isinstance(data, dict), f"Realm section isn't a dictionary: {data!r}"
domain = data.pop("domain") domain = data["domain"]
servers = data.pop("servers").split() servers = data["servers"].split()
forest_root_domain = data.pop("forest_root_domain", domain) forest_root_domain = data.get("forest_root_domain", domain)
account = data.pop("account", None) account = data.get("account", None)
password_file = data.pop("password_file", None) password_file = data.get("password_file", None)
password_folder = data.pop("password_folder", None) password_folder = data.get("password_folder", None)
return Realm( return Realm(
name=name, name=name,
domain=domain, domain=domain,
@ -101,8 +102,8 @@ class Keyringer(PasswordManager):
@staticmethod @staticmethod
def load(data: typing.Any) -> Keyringer: def load(data: typing.Any) -> Keyringer:
assert isinstance(data, dict) assert isinstance(data, dict)
keyring = data.pop("keyring") keyring = data["keyring"]
folder = data.pop("folder") folder = data.get("folder", "")
return Keyringer(keyring=keyring, folder=folder) return Keyringer(keyring=keyring, folder=folder)
def get_password(self, password_name: str) -> str: def get_password(self, password_name: str) -> str:
@ -145,9 +146,17 @@ class Keepass(PasswordManager):
def get_password(self, password_name: str) -> str: def get_password(self, password_name: str) -> str:
import pykeepass # already made sure it is avaiable above import pykeepass # already made sure it is avaiable above
while True:
try:
password = getpass.getpass(f"KeePass password for database {self.database}: ") password = getpass.getpass(f"KeePass password for database {self.database}: ")
kp = pykeepass.PyKeePass(self.database, password=password) kp = pykeepass.PyKeePass(self.database, password=password)
break
except pykeepass.exceptions.CredentialsError:
print("Invalid password", file=sys.stderr)
entry = kp.find_entries(username=password_name, first=True) entry = kp.find_entries(username=password_name, first=True)
if not entry:
raise SystemExit(f"no KeePass entry for {password_name!r} found")
return entry.password # type: ignore return entry.password # type: ignore
@ -190,8 +199,8 @@ class Config:
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
assert isinstance(data, dict) assert isinstance(data, dict)
assert "realms" in data assert "realms" in data, "Missing realms section in config"
realms_data = data.pop("realms") realms_data = data["realms"]
assert isinstance(realms_data, dict) assert isinstance(realms_data, dict)
realms = {} realms = {}
for name, realm_data in realms_data.items(): for name, realm_data in realms_data.items():
@ -201,15 +210,15 @@ class Config:
if "keyringer" in data: if "keyringer" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = Keyringer.load(data.pop("keyringer")) password_manager = Keyringer.load(data["keyringer"])
if "keepass" in data: if "keepass" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = Keepass.load(data.pop("keepass")) password_manager = Keepass.load(data["keepass"])
if "password-script" in data: if "password-script" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = PasswordScript.load(data.pop("password-script")) password_manager = PasswordScript.load(data["password-script"])
return Config(realms=realms, password_manager=password_manager) return Config(realms=realms, password_manager=password_manager)
@ -220,7 +229,11 @@ class Config:
""" """
if realm.account is None: if realm.account is None:
raise RuntimeError("Can't get password without acccount - should use kerberos instead") raise RuntimeError("Can't get password without acccount - should use kerberos instead")
try:
if self.password_manager: if self.password_manager:
return self.password_manager.get_password(realm.password_name) return self.password_manager.get_password(realm.password_name)
return getpass.getpass(f"Enter password for {realm.password_name}: ") return getpass.getpass(f"Enter password for {realm.password_name}: ")
except (KeyboardInterrupt, EOFError):
raise SystemExit("Password prompt / retrieval aborted")