Compare commits

...

10 Commits

Author SHA1 Message Date
ca0aa23c27 ldaptool 0.5-1 2023-05-10 19:54:32 +02:00
8928973ee7 ldaptool-0.5
-----BEGIN PGP SIGNATURE-----
 
 iQJYBAABCgBCFiEEcdms641aWv8vJSXMIcx4mUG+20gFAmRb2iIkHHN0ZWZhbi5i
 dWVobGVyQHRpay51bmktc3R1dHRnYXJ0LmRlAAoJECHMeJlBvttIscwP/inWA7tN
 X0AvSGyRAd6Gfo6AsND+IH26gdCjHSTeq4EGf94Am7KSx2IPOplWDxcPm7Shvz3D
 Qz1zfsDXIp58mC5/mrwu2RMsrOAm2Xymu6QUgCzKrork1/QRzjGrhLaDsxFdGUt+
 8yPAx5YVUTOww6FjmaYQpTTQdpkZh7LxnToTTS8uh+90kjyJ5ttMoje2JL427rPh
 wpaioD2BJqHJncS7d+wI8UOyH6lnM8GyFaiJ8szMWAim8i4YlSh98/m4xHvTLxKU
 qA2h5rfisgQttuYrFvX1Bdb+TboFOjq0TsNE0Ot/2T5J72iERy9kuwNhtBQbDfvm
 B/n+DLOwIkzIWndVqP+UWMl0SKZNNuF09d4Ppo4E1LJJ1WFj1+1+cDO40t18SJGB
 vz6P8gCADt3OfuxBAUe+F6KeP7nDG+ghp6t8gWHrtz0E9tuq9BpA3UDVhcQsGCcI
 slwg9SukdZ/6DhgtGt5Da2YlDfWu+8HTcO2O/vsICAVQd/1Pe/Vh34DYARP2KKBD
 6/P4qTZURj2w/RK+Dg3XvUp9EjcZRoP/34b/JpvZj7k4sZFcMxi+jh/gEY+2rcg0
 qvJdZxfD5UENmGDI8d2W8nzqQ8Mhb1cTeFHK+Cy2wOZLhGhK5yB0DcPjALqHtgLD
 au9ib49D8kvE8XlafoRRv66aWwSjQkBJnGL5
 =ZgyQ
 -----END PGP SIGNATURE-----

Merge tag 'ldaptool-0.5' into debian

ldaptool-0.5
2023-05-10 19:53:44 +02:00
55deb40268 decode securityIdentifier attribute as SID 2023-05-10 19:53:03 +02:00
3b5f698ff5 add argument to postprocess steps and support index/slicing in DN-related hooks; document them 2023-05-10 19:52:44 +02:00
34fcd259ef improve config loading: don't modify dicts to allow yaml repeated nodes 2023-05-10 16:25:41 +02:00
f036713d71 improve some error messages 2023-05-10 16:23:32 +02:00
f1d57487be Catch CTRL+C and CTRL+D in password prompts 2023-05-10 16:15:09 +02:00
04fd42c63b Catch invalid passwords in keepass 2023-05-10 16:01:34 +02:00
1a9829b93b handle missing KeePass entry 2023-05-10 16:00:07 +02:00
21069e892e :Fix version requirement for python3.10 2023-05-02 17:47:11 +02:00
9 changed files with 223 additions and 56 deletions

View File

@ -18,6 +18,34 @@ CLI tool to query LDAP/AD servers
* By default the first 1000 entries are shown, and it errors if there are more results * By default the first 1000 entries are shown, and it errors if there are more results
* Use `--all` to show all results * Use `--all` to show all results
## Virtual attributes
`ldaptool` supports constructing new values from existing attributes by adding a `:<postprocess>` suffix (which can be chained apart from the length limit).
* Some suffixes support an argument as `:<postprocess>[<arg>]`.
* A single integer as postprocess suffix limits the length of the value; it replaces the last character of the output with `…` if it cut something off.
* Multi-valued attributes generate multiple virtual attrites; each value is processed individually. (The values are joined afterwards for table output if needed.)
### DN handling
DNs are decoded into lists of lists of `(name, value)` pairs (the inner list usually contains exactly one entry).
Attributes with a `DC` name are considered part of the "domain", everything else belongs to the "path".
(Usually a DN will start with path segments and end with domain segments.)
The path is read from back to front.
The following postprocess hooks are available:
* `domain`: extracts the domain as DNS FQDN (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com`)
* `path`: extracts the non-domain parts without names and separates them by `/` (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `Dep1/Someone`)
* `fullpath`: uses the `domain` as first segment in a path (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com/Dep1/Someone`)
* `dnslice`: extracts a "slice" from a DN (outer list only); the result is still in DN format.
`path`, `fullpath` and `dnslice` take an optional index/slice as argument, written in python syntax.
For `path` and `fullpath` this extracts only the given index/slice from the path (`fullpath` always includes the full FQDN as first segment), `dnslice` operates on the outer list of decoded (lists of) pairs:
* `dn:dnslice[1:]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `OU=Dep1,DC=example,DC=com`
* `dn:fullpath[:-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `example.com/Dep1`
* `dn:path[-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `Someone`
## Authentication, Protocol, Ports ## Authentication, Protocol, Ports
`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones. `ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones.

20
debian/changelog vendored
View File

@ -1,3 +1,23 @@
ldaptool (0.5-1) unstable; urgency=medium
[ Daniel Dizdarevic ]
* :Fix version requirement for python3.10
[ Stefan Bühler ]
* handle missing KeePass entry
[ Daniel Dizdarevic ]
* Catch invalid passwords in keepass
* Catch CTRL+C and CTRL+D in password prompts
[ Stefan Bühler ]
* improve some error messages
* improve config loading: don't modify dicts to allow yaml repeated nodes
* add argument to postprocess steps and support index/slicing in DN-related hooks; document them
* decode securityIdentifier attribute as SID
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Wed, 10 May 2023 19:53:51 +0200
ldaptool (0.4-1) unstable; urgency=medium ldaptool (0.4-1) unstable; urgency=medium
* move argument/column handling to decoder (prepare for more post-processing in decoder) * move argument/column handling to decoder (prepare for more post-processing in decoder)

View File

@ -16,7 +16,7 @@ classifiers = [
] ]
dynamic = ["version", "description"] dynamic = ["version", "description"]
requires-python = "~=3.11" requires-python = "~=3.10"
dependencies = [ dependencies = [
"python-ldap", "python-ldap",
"PyYAML", "PyYAML",

View File

@ -105,7 +105,7 @@ class _Context:
try: try:
self.config = search.Config.load() self.config = search.Config.load()
except Exception as e: except Exception as e:
raise SystemExit(f"config error: {e}") raise SystemExit(f"config error: {e!r}")
try: try:
self.arguments = arguments_p.from_args(args) self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e: except decode.InvalidStep as e:

View File

@ -33,19 +33,26 @@ class DNInfo:
def domain(self) -> str: def domain(self) -> str:
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc") return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str: def _path(self, *, escape: typing.Callable[[str], str], sep: str, selection: slice = slice(None)) -> str:
return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc") rev_flattened = [ava[1] for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc"]
return sep.join(value for value in rev_flattened[selection])
def sliced_path(self, selection: slice, /) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/", selection=selection)
@functools.cached_property @functools.cached_property
def path(self) -> str: def path(self) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/") return self.sliced_path(slice(None))
@property def sliced_full_path(self, selection: slice, /) -> str:
def full_path(self) -> str:
domain = self.domain domain = self.domain
path = self.path path = self.sliced_path(selection)
if not path: if not path:
return self.domain return self.domain
if not domain: if not domain:
return self.path return self.path
return f"{domain}/{path}" return f"{domain}/{path}"
@property
def full_path(self) -> str:
return self.sliced_full_path(slice(None))

View File

@ -101,7 +101,7 @@ class Attribute:
return return
def _try_decode(self, args: Arguments) -> None: def _try_decode(self, args: Arguments) -> None:
if self.name in ("objectSid",): if self.name in ("objectSid","securityIdentifier"):
self._try_decode_sid() self._try_decode_sid()
elif self.name in ("msExchMailboxGuid", "objectGUID"): elif self.name in ("msExchMailboxGuid", "objectGUID"):
self._try_decode_uuid() self._try_decode_uuid()

View File

@ -2,6 +2,9 @@ from __future__ import annotations
import abc import abc
import dataclasses import dataclasses
import typing
import ldap.dn
from ldaptool._utils.dninfo import DNInfo from ldaptool._utils.dninfo import DNInfo
@ -14,6 +17,27 @@ class Step(abc.ABC):
... ...
def _args_to_slice(args: str) -> slice:
args = args.strip()
if not args:
return slice(None)
params: list[typing.Optional[int]] = []
for arg in args.split(":"):
arg = arg.strip()
if arg:
params.append(int(arg))
else:
params.append(None)
if len(params) == 1:
assert isinstance(params[0], int)
ndx = params[0]
if ndx == -1:
return slice(ndx, None) # from last element to end - still exactly one element
# this doesn't work for ndx == -1: slice(-1, 0) is always empty. otherwise it should return [ndx:][:1].
return slice(ndx, ndx + 1)
return slice(*params)
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class MaxLength(Step): class MaxLength(Step):
limit: int limit: int
@ -26,6 +50,10 @@ class MaxLength(Step):
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class DNDomain(Step): class DNDomain(Step):
def __init__(self, args: str) -> None:
if args:
raise ValueError(":domain doesn't support an argument")
def step(self, value: str) -> str: def step(self, value: str) -> str:
try: try:
dninfo = DNInfo(dn=value) dninfo = DNInfo(dn=value)
@ -37,30 +65,57 @@ class DNDomain(Step):
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class DNPath(Step): class DNPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str: def step(self, value: str) -> str:
try: try:
dninfo = DNInfo(dn=value) dninfo = DNInfo(dn=value)
except Exception: except Exception:
# not a valid DN -> no processing # not a valid DN -> no processing
return value return value
return dninfo.path return dninfo.sliced_path(self.path_slice)
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class DNFullPath(Step): class DNFullPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str: def step(self, value: str) -> str:
try: try:
dninfo = DNInfo(dn=value) dninfo = DNInfo(dn=value)
except Exception: except Exception:
# not a valid DN -> no processing # not a valid DN -> no processing
return value return value
return dninfo.full_path return dninfo.sliced_full_path(self.path_slice)
_STEPS = { @dataclasses.dataclass(slots=True)
"domain": DNDomain(), class DNSlice(Step):
"path": DNPath(), slice: slice
"fullpath": DNFullPath(),
def __init__(self, args: str) -> None:
self.slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return ldap.dn.dn2str(dninfo.parts[self.slice]) # type: ignore
_STEPS: dict[str, typing.Callable[[str], Step]] = {
"domain": DNDomain,
"path": DNPath,
"fullpath": DNFullPath,
"dnslice": DNSlice,
} }
@ -78,19 +133,63 @@ class PostProcess:
return value return value
def parse_steps(steps: list[str]) -> PostProcess: def parse_steps(steps: str) -> PostProcess:
max_len = 0 result: list[Step] = []
try:
max_len = int(steps[-1]) cur_id_start = 0
steps.pop() cur_args_start = -1
except ValueError: current_id = ""
pass current_args = ""
result = [] count_brackets = 0
for step in steps: step_done = False
step_i = _STEPS.get(step, None)
def handle_step() -> None:
nonlocal cur_id_start, cur_args_start, current_id, current_args, step_done
assert step_done
step_i = _STEPS.get(current_id, None)
if step_i is None: if step_i is None:
raise InvalidStep(f"Unknown post-processing step {step!r}") try:
result.append(step_i) max_len = int(current_id)
if max_len: result.append(MaxLength(max_len))
result.append(MaxLength(max_len)) except ValueError:
raise InvalidStep(f"Unknown post-processing step {current_id!r}")
else:
result.append(step_i(current_args))
cur_id_start = pos + 1
cur_args_start = -1
current_id = ""
current_args = ""
step_done = False
for pos, char in enumerate(steps):
if step_done:
if char != ":":
raise InvalidStep(f"Require : after step, found {char!r} at pos {pos}")
handle_step()
elif char == "[":
if count_brackets == 0:
# end of identifier
current_id = steps[cur_id_start:pos]
cur_args_start = pos + 1
count_brackets += 1
elif char == "]":
count_brackets -= 1
if count_brackets == 0:
current_args = steps[cur_args_start:pos]
step_done = True
elif count_brackets:
continue
elif not char.isalnum():
raise InvalidStep(f"Expecting either alphanumeric, ':' or '[', got {char!r} at {pos}")
if not step_done:
current_id = steps[cur_id_start:]
if current_id:
step_done = True
if step_done:
handle_step()
return PostProcess(result) return PostProcess(result)

View File

@ -57,20 +57,20 @@ class Arguments(argclasses.BaseArguments):
self.columns_keys.append(column) self.columns_keys.append(column)
if column == "dndomain": if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"]) self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("domain")
attributes_set.add("dn") attributes_set.add("dn")
elif column == "dnpath": elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"]) self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("path")
attributes_set.add("dn") attributes_set.add("dn")
elif column == "dnfullpath": elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"]) self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("fullpath")
attributes_set.add("dn") attributes_set.add("dn")
else: else:
step_names = column.split(":") col_parts = column.split(":", maxsplit=1)
attributes_set.add(step_names[0]) attributes_set.add(col_parts[0])
if len(step_names) > 1: if len(col_parts) == 2:
source = step_names.pop(0) source, steps = col_parts
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names) self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(steps)
if all_attributes: if all_attributes:
self.attributes = [] self.attributes = []

View File

@ -7,6 +7,7 @@ import os
import os.path import os.path
import shlex import shlex
import subprocess import subprocess
import sys
import typing import typing
import yaml import yaml
@ -28,13 +29,13 @@ class Realm:
@staticmethod @staticmethod
def load(name: str, data: typing.Any) -> Realm: def load(name: str, data: typing.Any) -> Realm:
assert isinstance(data, dict) assert isinstance(data, dict), f"Realm section isn't a dictionary: {data!r}"
domain = data.pop("domain") domain = data["domain"]
servers = data.pop("servers").split() servers = data["servers"].split()
forest_root_domain = data.pop("forest_root_domain", domain) forest_root_domain = data.get("forest_root_domain", domain)
account = data.pop("account", None) account = data.get("account", None)
password_file = data.pop("password_file", None) password_file = data.get("password_file", None)
password_folder = data.pop("password_folder", None) password_folder = data.get("password_folder", None)
return Realm( return Realm(
name=name, name=name,
domain=domain, domain=domain,
@ -101,8 +102,8 @@ class Keyringer(PasswordManager):
@staticmethod @staticmethod
def load(data: typing.Any) -> Keyringer: def load(data: typing.Any) -> Keyringer:
assert isinstance(data, dict) assert isinstance(data, dict)
keyring = data.pop("keyring") keyring = data["keyring"]
folder = data.pop("folder") folder = data.get("folder", "")
return Keyringer(keyring=keyring, folder=folder) return Keyringer(keyring=keyring, folder=folder)
def get_password(self, password_name: str) -> str: def get_password(self, password_name: str) -> str:
@ -145,9 +146,17 @@ class Keepass(PasswordManager):
def get_password(self, password_name: str) -> str: def get_password(self, password_name: str) -> str:
import pykeepass # already made sure it is avaiable above import pykeepass # already made sure it is avaiable above
password = getpass.getpass(f"KeePass password for database {self.database}: ") while True:
kp = pykeepass.PyKeePass(self.database, password=password) try:
password = getpass.getpass(f"KeePass password for database {self.database}: ")
kp = pykeepass.PyKeePass(self.database, password=password)
break
except pykeepass.exceptions.CredentialsError:
print("Invalid password", file=sys.stderr)
entry = kp.find_entries(username=password_name, first=True) entry = kp.find_entries(username=password_name, first=True)
if not entry:
raise SystemExit(f"no KeePass entry for {password_name!r} found")
return entry.password # type: ignore return entry.password # type: ignore
@ -190,8 +199,8 @@ class Config:
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
assert isinstance(data, dict) assert isinstance(data, dict)
assert "realms" in data assert "realms" in data, "Missing realms section in config"
realms_data = data.pop("realms") realms_data = data["realms"]
assert isinstance(realms_data, dict) assert isinstance(realms_data, dict)
realms = {} realms = {}
for name, realm_data in realms_data.items(): for name, realm_data in realms_data.items():
@ -201,15 +210,15 @@ class Config:
if "keyringer" in data: if "keyringer" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = Keyringer.load(data.pop("keyringer")) password_manager = Keyringer.load(data["keyringer"])
if "keepass" in data: if "keepass" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = Keepass.load(data.pop("keepass")) password_manager = Keepass.load(data["keepass"])
if "password-script" in data: if "password-script" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = PasswordScript.load(data.pop("password-script")) password_manager = PasswordScript.load(data["password-script"])
return Config(realms=realms, password_manager=password_manager) return Config(realms=realms, password_manager=password_manager)
@ -220,7 +229,11 @@ class Config:
""" """
if realm.account is None: if realm.account is None:
raise RuntimeError("Can't get password without acccount - should use kerberos instead") raise RuntimeError("Can't get password without acccount - should use kerberos instead")
if self.password_manager:
return self.password_manager.get_password(realm.password_name)
return getpass.getpass(f"Enter password for {realm.password_name}: ") try:
if self.password_manager:
return self.password_manager.get_password(realm.password_name)
return getpass.getpass(f"Enter password for {realm.password_name}: ")
except (KeyboardInterrupt, EOFError):
raise SystemExit("Password prompt / retrieval aborted")