commit 1f28ee36226a1446c581ca554ec369cf6b1223b0 Author: Stefan Bühler Date: Fri Apr 28 02:45:32 2023 +0200 Initial diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/.pycodestyle b/.pycodestyle new file mode 100644 index 0000000..7328b15 --- /dev/null +++ b/.pycodestyle @@ -0,0 +1,9 @@ +[pycodestyle] +# E241 multiple spaces after ':' [ want to align stuff ] +# E266 too many leading '#' for block comment [ I like marking disabled code blocks with '### ' ] +# E701 multiple statements on one line (colon) [ perfectly readable ] +# E713 test for membership should be ‘not in’ [ disagree: want `not a in x` ] +# E714 test for object identity should be 'is not' [ disagree: want `not a is x` ] +# W503 Line break occurred before a binary operator [ pep8 flipped on this (also contradicts W504) ] +ignore = E241,E266,E701,E713,E714,W503 +max-line-length = 120 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b0a90df --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2023 Stefan Bühler (University of Stuttgart) +Copyright (c) 2023 Daniel Dizdarevic (University of Stuttgart) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..9b7639f --- /dev/null +++ b/README.md @@ -0,0 +1,90 @@ +# ldaptool + +CLI tool to query LDAP/AD servers + +* Configuration file to configure "realms" + * DNS domain (mapping to ldap search base as DC labels) + * LDAP servers in that domain + * Bind account + * Integration with password managers +* Various output formats + * Classic LDIF + * JSON stream (with detailed or simplified attribute values) + * CSV + * Markdown table with stretched columns (for viewing in CLI/for monospaces fonts) +* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl) +* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination + * By default the first 1000 entries are shown, and it errors if there are more results + * Use `-all` to show all results + +## Authentication, Protocol, Ports + +`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones. + +## Config file + +Location: `~/.config/ldaptool.yaml` + +### Realms + +```yaml +realms: + EXAMPLE: + domain: "example.com" + servers: server1 server2 + account: "bind@example.com" + password_folder: mainaccounts + EXAMPLE.admin: + domain: "example.com" + servers: server1 server2 + account: "CN=admin,OU=Admins,DC=example,DC=com" + password_folder: adminaccounts + EXAMPLE.admin2: + domain: "example.com" + servers: server1 server2 + account: "CN=admin,OU=Admins,DC=example,DC=com" + password_file: localadmin2 + password_folder: adminaccounts + SUB: + domain: "sub.example.com" + servers: subserver1 subserver2 + forest_root_domain: "example.com" +``` + +The `servers` field is a whitespace separates list of hostnames in the domain. + +If a password manager is used, the `password_file` (defaults to names derived from `account`) and `password_folder` fields determine the name of the file ("secret") queried from the password manager. Here the following file names would be used: +* `EXAMPLE`: `mainaccounts/bind` +* `EXAMPLE.admin`: `adminaccounts/example.com/Admins/admin` +* `EXAMPLE.admin2`: `adminaccounts/localadmin2` + +If the `account` field isn't present `ldaptool` always uses kerberos; if `--krb` is used, `account` is ignored. + +Windows AD has a concept of a "global catalog" across all domains in a AD Forest; it uses separate ports (3268 without TLS and 3269 with TLS). +The `forest_root_domain` field can be used to set a search base for global catalog (`--gc`) queries (usually the forest root should be parent domain). + +Unless specified with `--base` the search base is derived from `domain` (or `forest_root_domain` with `--gc`) as `DC=...` for each DNS label. + +#### Script as password manager + +```yaml +password-script: keyring local decrypt +``` + +This configures a script as password manager. + +Either takes a string (split by [`shlex.split`](https://docs.python.org/3/library/shlex.html#shlex.split)) or a list of strings. +The password name is appended as last argument. + +#### keyringer + +```yaml +keyringer: + keyring: yourkeyringname + folder: ldapquery +``` + +This configures [`keyringer`](https://0xacab.org/rhatto/keyringer) (based on GPG) as password manager. + +`keyringer` need a "keyring" to search in, and you can (optionally) specify a folder to be +prefixed to the password names created from the realm. diff --git a/fmt.sh b/fmt.sh new file mode 100755 index 0000000..9b9a5bb --- /dev/null +++ b/fmt.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +self=$(dirname "$(readlink -f "$0")") +cd "${self}" + +python3 -m black src +python3 -m isort src diff --git a/lints.sh b/lints.sh new file mode 100755 index 0000000..0025ddf --- /dev/null +++ b/lints.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +set -e + +cd "$(dirname "$(readlink "$0")")" + +sources=($@) +if [ "${#sources[@]}" -eq 0 ]; then + sources=(src) +fi + +rc=0 + +run() { + # remember last failure + if "$@"; then :; else rc=$?; fi +} + +echo "[pycodestyle]" +run pycodestyle --config=.pycodestyle "${sources[@]}" +echo "[pyflakes]" +run python3 -m pyflakes "${sources[@]}" +echo "[mypy]" +run mypy "${sources[@]}" +echo "[black]" +run python3 -m black --check "${sources[@]}" +echo "[isort]" +run python3 -m isort --check-only "${sources[@]}" + +exit $rc diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..0b796a7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,50 @@ +[build-system] +requires = ["flit_core >=3.2,<4"] +build-backend = "flit_core.buildapi" + +[project] +name = "ldaptool" +authors = [ + {name = "Stefan Bühler", email = "stefan.buehler@tik.uni-stuttgart.de"}, + {name = "Daniel Dizdarevic", email = "daniel.dizdarevic@tik.uni-stuttgart.de"}, +] +readme = "README.md" +license = {file = "LICENSE"} +classifiers = [ + "Private :: Do Not Upload", + "License :: OSI Approved :: MIT License", +] +dynamic = ["version", "description"] + +requires-python = "~=3.11" +dependencies = [ + "python-ldap", + "PyYAML", +] + +[project.scripts] +ldaptool = "ldaptool._main:main" + +[project.urls] +# Documentation = "..." +Source = "https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool" + +[tool.black] +line-length = 120 + +[tool.mypy] +disallow_any_generics = true +disallow_untyped_defs = true +warn_redundant_casts = true +warn_return_any = true +warn_unused_configs = true +warn_unused_ignores = true +warn_unreachable = true + +[[tool.mypy.overrides]] +module = [ + "ldap", + "ldap.dn", + "ldap.controls.libldap", +] +ignore_missing_imports = true diff --git a/src/ldaptool/__init__.py b/src/ldaptool/__init__.py new file mode 100644 index 0000000..2f390ba --- /dev/null +++ b/src/ldaptool/__init__.py @@ -0,0 +1,5 @@ +""" CLI ldapsearch tool with json and table output """ + +from __future__ import annotations + +__version__ = "0.1" diff --git a/src/ldaptool/_main.py b/src/ldaptool/_main.py new file mode 100644 index 0000000..527ec37 --- /dev/null +++ b/src/ldaptool/_main.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +import argparse +import csv +import subprocess +import sys +import typing + +from ldaptool import decode, search +from ldaptool._utils.ldap import Result, SizeLimitExceeded + + +class _Context: + def __init__(self) -> None: + parser = argparse.ArgumentParser() + arguments_p = search.Arguments.add_to_parser(parser) + args = parser.parse_args() + try: + self.config = search.Config.load() + except Exception as e: + raise SystemExit(f"config error: {e}") + self.arguments = arguments_p.from_args(args) + + def run(self) -> None: + # starting the search sets the base we want to print + search_iterator = search.search(config=self.config, arguments=self.arguments) + self._run_with_filters(search_iterator) + + def _run_with_filters(self, search_iterator: typing.Iterable[Result]) -> None: + output: typing.IO[str] = sys.stdout + procs: list[subprocess.Popen[str]] = [] + + def add_filter(cmd: list[str]) -> None: + nonlocal output + proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=output, text=True) + procs.append(proc) + if output != sys.stdout: + output.close() + assert proc.stdin + output = proc.stdin + + try: + if self.arguments.table: + add_filter(["csvlook"]) + if self.arguments.sort: + add_filter(["csvsort", "--blanks"]) + self._run_search(search_iterator, stream=output) + finally: + if procs: + output.close() + for proc in reversed(procs): + proc.wait() + + def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None: + decoder = decode.Decoder(arguments=self.arguments) + + num_responses = 0 + num_entries = 0 + + ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human) + + if ldif_output: + print("# extended LDIF") + print("#") + print("# LDAPv3") + print(f"# base <{self.arguments.base}> with scope subtree") + print(f"# filter: {self.arguments.filter}") + if self.arguments.attributes: + print(f"# requesting: {' '.join(self.arguments.attributes)}") + else: + print("# requesting: ALL") + print("#") + print() + + if self.arguments.csv: + csv_out = csv.DictWriter( + stream, + fieldnames=self.arguments.columns, + lineterminator="\n", + extrasaction="ignore", + ) + csv_out.writeheader() + # dicts contain data by lower case key + csv_out.fieldnames = [col.lower() for col in self.arguments.columns] + + try: + for dn, entry in search_iterator: + num_responses += 1 + if dn is None: + if not self.arguments.csv: + print("# search reference") + for ref in entry: + assert isinstance(ref, str) + print(f"ref: {ref}") + print() + continue + # normal entry + assert not isinstance(entry, list) + num_entries += 1 + obj = decoder.read(dn=dn, entry=entry) + if self.arguments.csv: + csv_out.writerow(decoder.human(dn=dn, entry=obj)) + else: + decoder.emit(dn=dn, entry=obj) + except SizeLimitExceeded as e: + raise SystemExit(f"Error: {e}") + + if ldif_output: + print(f"# numResponses: {num_responses}") + print(f"# numEntries: {num_entries}") + + +def main() -> None: + ctx = _Context() + ctx.run() diff --git a/src/ldaptool/_utils/__init__.py b/src/ldaptool/_utils/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/src/ldaptool/_utils/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/src/ldaptool/_utils/argclasses.py b/src/ldaptool/_utils/argclasses.py new file mode 100644 index 0000000..7c65fb4 --- /dev/null +++ b/src/ldaptool/_utils/argclasses.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +import abc +import argparse +import dataclasses +import typing + + +class _BaseArgumentDefinition(abc.ABC): + __slots__ = () + + @abc.abstractmethod + def add_argument(self, *, parser: argparse.ArgumentParser, field: dataclasses.Field[typing.Any], dest: str) -> None: + raise NotImplementedError() + + +@dataclasses.dataclass(slots=True, kw_only=True) +class _ArgumentDefinition(_BaseArgumentDefinition): + flags: tuple[str, ...] = () + required: bool = False + help: str + + def add_argument(self, *, parser: argparse.ArgumentParser, field: dataclasses.Field[typing.Any], dest: str) -> None: + if field.type == "bool": + parser.add_argument( + f"--{field.name}", + *self.flags, + default=field.default, + dest=dest, + action=argparse.BooleanOptionalAction, + help=f"{self.help} (default: %(default)s)", + ) + elif field.type.startswith("list["): + parser.add_argument( + f"--{field.name}", + *self.flags, + required=self.required, + # not passing default (nor default_factor). + # if argument isn't used, it will be None, and the + # dataclass default is triggered + dest=dest, + action="append", + help=f"{self.help}", + ) + else: + parser.add_argument( + f"--{field.name}", + *self.flags, + required=self.required, + default=field.default, + dest=dest, + help=f"{self.help}", + ) + + +def arg(*flags: str, required: bool = False, help: str) -> dict[typing.Any, typing.Any]: + return {id(_BaseArgumentDefinition): _ArgumentDefinition(flags=flags, required=required, help=help)} + + +@dataclasses.dataclass(slots=True, kw_only=True) +class _ManualArgumentDefinition(_BaseArgumentDefinition): + callback: typing.Callable[[argparse.ArgumentParser, str], None] + + def add_argument(self, *, parser: argparse.ArgumentParser, field: dataclasses.Field[typing.Any], dest: str) -> None: + self.callback(parser, dest) + + +def manual(callback: typing.Callable[[argparse.ArgumentParser, str], None]) -> dict[typing.Any, typing.Any]: + return {id(_BaseArgumentDefinition): _ManualArgumentDefinition(callback=callback)} + + +_TArgs = typing.TypeVar("_TArgs", bound="BaseArguments") + + +@dataclasses.dataclass(slots=True, kw_only=True) +class BaseArguments: + @classmethod + def add_fields_to_parser( + cls: type[_TArgs], + parser: argparse.ArgumentParser, + *, + prefix: str = "", + ) -> None: + for field in dataclasses.fields(cls): + argdef = field.metadata.get(id(_BaseArgumentDefinition), None) + if argdef is None: + continue + assert isinstance(argdef, _BaseArgumentDefinition) + dest = f"{prefix}{field.name}" + argdef.add_argument(parser=parser, field=field, dest=dest) + + @classmethod + def add_to_parser( + cls: type[_TArgs], + parser: argparse.ArgumentParser, + *, + prefix: str = "", + ) -> ArgumentsParser[_TArgs]: + cls.add_fields_to_parser(parser, prefix=prefix) + return ArgumentsParser(cls=cls, prefix=prefix) + + +@dataclasses.dataclass(slots=True, kw_only=True) +class ArgumentsParser(typing.Generic[_TArgs]): + cls: type[_TArgs] + prefix: str + + def get_fields(self, args: argparse.Namespace) -> dict[str, typing.Any]: + data = {} + for field in dataclasses.fields(self.cls): + argdef = field.metadata.get(id(_BaseArgumentDefinition), None) + if argdef is None: + continue + value = getattr(args, f"{self.prefix}{field.name}") + if not value is None: + data[field.name] = value + return data + + def from_args(self, args: argparse.Namespace) -> _TArgs: + return self.cls(**self.get_fields(args)) diff --git a/src/ldaptool/_utils/dninfo.py b/src/ldaptool/_utils/dninfo.py new file mode 100644 index 0000000..ba429a2 --- /dev/null +++ b/src/ldaptool/_utils/dninfo.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import dataclasses +import functools +import re +import typing + +import ldap +import ldap.dn + + +def _escape_backslash(value: str, *, special: str) -> str: + # escape backslash itself first + value = value.replace("\\", "\\\\") + # escape newlines and NULs with special escape sequences + value = value.replace("\n", "\\n").replace("\r", "\\r").replace("\0", "\\0") + # escape "specials" by prefixing them with backslash + pattern_class = re.escape(special) + return re.sub(f"([{pattern_class}])", r"\\\1", value) + + +@dataclasses.dataclass(frozen=True) +class DNInfo: + dn: str + parts: list[list[tuple[str, str, int]]] # list of list of (la_attr, la_value, la_flags) + + def __init__(self, *, dn: str) -> None: + parts = ldap.dn.str2dn(dn, flags=ldap.DN_FORMAT_LDAPV3) + object.__setattr__(self, "dn", dn) + object.__setattr__(self, "parts", parts) + + @functools.cached_property + def domain(self) -> str: + return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc") + + def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str: + return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc") + + @functools.cached_property + def path(self) -> str: + return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/") + + @property + def full_path(self) -> str: + domain = self.domain + path = self.path + if not path: + return self.domain + if not domain: + return self.path + return f"{domain}/{path}" diff --git a/src/ldaptool/_utils/ldap.py b/src/ldaptool/_utils/ldap.py new file mode 100644 index 0000000..45e5fda --- /dev/null +++ b/src/ldaptool/_utils/ldap.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import typing + +import ldap + + +class SizeLimitExceeded(Exception): + pass + + +Entry = tuple[str, dict[str, list[bytes]]] +Ref = tuple[None, list[str]] +Result = Entry | Ref +Results = list[Result] + + +def ldap_search_ext( + ldap_con: ldap.ldapobject.LDAPObject, + base: str, + filterstr: str = "(objectClass=*)", + *, + scope: int = ldap.SCOPE_SUBTREE, + attrlist: typing.Optional[typing.Sequence[str]] = None, + pagelimit: int = 5000, + sizelimit: int = 0, + serverctrls: list[ldap.controls.RequestControl] = [], + **kwargs: typing.Any, +) -> typing.Iterable[Result]: + """ + Retrieve all results through pagination + """ + from ldap.controls.libldap import SimplePagedResultsControl + + page_ctrl = SimplePagedResultsControl(criticality=True, size=pagelimit, cookie=b"") + serverctrls = [page_ctrl] + serverctrls + + def try_get_page() -> tuple[Results, list[ldap.controls.ResponseControl]]: + response = ldap_con.search_ext( + base=base, + scope=scope, + filterstr=filterstr, + attrlist=attrlist, + serverctrls=serverctrls, + **kwargs, + ) + _rtype, results, _rmsgid, resp_controls = ldap_con.result3(response) + # print(f"Ldap search got page: rtype: {_rtype}, results: {len(results)} msgid: {_rmsgid}") + return results, resp_controls + + def get_page() -> tuple[Results, list[ldap.controls.ResponseControl]]: + if isinstance(ldap_con, ldap.ldapobject.ReconnectLDAPObject): + # ReconnectLDAPObject doesn't wrap search_ext / provide search_ext + result3 + return ldap_con._apply_method_s(lambda con: try_get_page()) # type: ignore + else: + return try_get_page() + + num_results = 0 + while True: + if sizelimit: + # don't get more than 1 result more than we are interested in anyway + page_ctrl.size = min(pagelimit, sizelimit - num_results + 1) + results, resp_controls = get_page() + + resp_page_controls = [ + control for control in resp_controls if control.controlType == SimplePagedResultsControl.controlType + ] + assert resp_page_controls, "The server ignores RFC 2696 control" + + # forward results from this page + for result in results: + if not result[0] is None: + # don't count refs + if sizelimit and num_results >= sizelimit: + raise SizeLimitExceeded(f"More than {sizelimit} results") + num_results += 1 + yield result + + # update cookie for next page + if not resp_page_controls[0].cookie: + # was last page, done + break + page_ctrl.cookie = resp_page_controls[0].cookie diff --git a/src/ldaptool/decode/__init__.py b/src/ldaptool/decode/__init__.py new file mode 100644 index 0000000..423e59a --- /dev/null +++ b/src/ldaptool/decode/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from ._decoder import Attribute, Decoder +from .arguments import Arguments + +__all__ = [ + "Arguments", + "Attribute", + "Decoder", +] diff --git a/src/ldaptool/decode/_decoder.py b/src/ldaptool/decode/_decoder.py new file mode 100644 index 0000000..128036b --- /dev/null +++ b/src/ldaptool/decode/_decoder.py @@ -0,0 +1,229 @@ +from __future__ import annotations + +import base64 +import dataclasses +import json +import re +import sys +import typing +import uuid + +from ldaptool._utils.dninfo import DNInfo + +from . import _types +from .arguments import Arguments + +TEntry = dict[str, list[bytes]] +TDecoded = dict[str, list["Attribute"]] + +CTRL = re.compile(r"[\x00-\x19]") + + +@dataclasses.dataclass(slots=True, kw_only=True) +class Attribute: + name: str + raw: bytes + utf8_clean: typing.Optional[str] + decoded: typing.Optional[str] + + @typing.overload + def __init__( + self, + *, + name: str, + raw: bytes, + arguments: Arguments, + ) -> None: + ... + + @typing.overload + def __init__( + self, + *, + name: str, + raw: bytes, + _utf8_clean: str, + ) -> None: + ... + + def __init__( + self, + *, + name: str, + raw: bytes, + arguments: typing.Optional[Arguments] = None, + _utf8_clean: typing.Optional[str] = None, + ) -> None: + self.name = name + self.raw = raw + self.utf8_clean = None + self.decoded = None + if not _utf8_clean is None: + # building fake attribute; no decoding + self.utf8_clean = _utf8_clean + return + assert arguments, "Need arguments for proper decoding" + try: + utf8_clean = raw.decode() + if not CTRL.search(utf8_clean): + self.utf8_clean = utf8_clean + except Exception: + # UTF-8 decode error + pass + self._try_decode(arguments) + + def _try_decode_sid(self) -> None: + try: + self.decoded = _types.sid.parse_raw(self.raw) + except Exception: + return + + def _try_decode_uuid(self) -> None: + try: + self.decoded = str(uuid.UUID(bytes=self.raw)) + except Exception: + return + + def _try_decode_timestamp(self, args: Arguments) -> None: + if self.utf8_clean: + try: + date = _types.timestamp.parse(self.utf8_clean) + except Exception: + return + if args.dateonly: + self.decoded = str(date.date()) + else: + self.decoded = str(date) + + def _try_decode_uac(self) -> None: + if self.utf8_clean: + try: + self.decoded = _types.uac.parse(self.utf8_clean.strip()) + except Exception: + return + + def _try_decode(self, args: Arguments) -> None: + if self.name in ("objectSid",): + self._try_decode_sid() + elif self.name in ("msExchMailboxGuid", "objectGUID"): + self._try_decode_uuid() + elif self.name in ( + "pwdLastSet", + "lastLogon", # DC local attribute, not synced + "lastLogonTimestamp", # set and synced across DCs if "more fresh" than msDS-LogonTimeSyncInterval + "badPasswordTime", + "accountExpires", + ): + self._try_decode_timestamp(args) + elif self.name == "userAccountControl": + self._try_decode_uac() + + @property + def _base64_value(self) -> str: + return base64.b64encode(self.raw).decode("ascii") + + def print(self) -> None: + if not self.decoded is None: + comment = self.utf8_clean + if comment is None: + comment = self._base64_value + print(f"{self.name}: {self.decoded} # {comment}") + elif not self.utf8_clean is None: + print(f"{self.name}: {self.utf8_clean}") + else: + print(f"{self.name}:: {self._base64_value}") + + def to_json(self) -> dict[str, typing.Any]: + item: dict[str, typing.Any] = {} + b64_value = self._base64_value + item["binary"] = b64_value + if not self.utf8_clean is None: + item["ldif_value"] = self.utf8_clean + if not self.decoded is None: + item["human"] = self.decoded + elif not self.utf8_clean is None: + item["human"] = self.utf8_clean + else: + item["human"] = self._base64_value + item["human_is_base64"] = True + return item + + def human(self) -> str: + if not self.decoded is None: + return self.decoded + elif not self.utf8_clean is None: + return self.utf8_clean + else: + return self._base64_value + + @staticmethod + def fake_attribute(name: str, value: str) -> Attribute: + return Attribute( + name=name, + raw=value.encode(), + _utf8_clean=value, + ) + + +@dataclasses.dataclass(slots=True, kw_only=True) +class Decoder: + arguments: Arguments + + def read(self, *, dn: str, entry: TEntry) -> dict[str, list[Attribute]]: + # lowercase attribute name in decoded dict. attribute itself still knows original for LDIF output. + decoded_entry = { + name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] + for name, raw_values in entry.items() + } + if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath: + dninfo = DNInfo(dn=dn) + if self.arguments.dndomain: + decoded_entry["dndomain"] = [ + Attribute.fake_attribute("dndomain", dninfo.domain), + ] + if self.arguments.dnpath: + decoded_entry["dnpath"] = [ + Attribute.fake_attribute("dnpath", dninfo.path), + ] + if self.arguments.dnfullpath: + decoded_entry["dnfullpath"] = [ + Attribute.fake_attribute("dnfullpath", dninfo.full_path), + ] + return decoded_entry + + def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]: + emit: dict[str, typing.Any] = dict(dn=dn) + for name, attrs in entry.items(): + emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) + return emit + + def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]: + emit: dict[str, typing.Any] = dict(dn=dn) + for name, attrs in entry.items(): + emit[name] = [attr.to_json() for attr in attrs] + return emit + + def _emit_json(self, *, dn: str, entry: TDecoded) -> None: + if self.arguments.human: + emit = self.human(dn=dn, entry=entry) + else: + emit = self.json(dn=dn, entry=entry) + json.dump(emit, sys.stdout, ensure_ascii=False) + print() # terminate output dicts by newline + + def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None: + print(f"dn: {dn}") + for attrs in entry.values(): + for attr in attrs: + attr.print() + print() # separate entries with newlines + + def emit(self, *, dn: str, entry: TDecoded) -> None: + if self.arguments.human or self.arguments.json: + self._emit_json(dn=dn, entry=entry) + else: + self._emit_ldif(dn=dn, entry=entry) + + def handle(self, *, dn: str, entry: TEntry) -> None: + entry_attrs = self.read(dn=dn, entry=entry) + self.emit(dn=dn, entry=entry_attrs) diff --git a/src/ldaptool/decode/_types/__init__.py b/src/ldaptool/decode/_types/__init__.py new file mode 100644 index 0000000..10adf6c --- /dev/null +++ b/src/ldaptool/decode/_types/__init__.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from . import sid, timestamp, uac + +__all__ = [ + "sid", + "timestamp", + "uac", +] diff --git a/src/ldaptool/decode/_types/sid.py b/src/ldaptool/decode/_types/sid.py new file mode 100644 index 0000000..ca68241 --- /dev/null +++ b/src/ldaptool/decode/_types/sid.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +import struct + + +def parse_raw(data: bytes) -> str: + revision = data[0] + count_sub_auths = data[1] + # clear first two bytes for 64-bit decoding + authority_raw = b"\x00\x00" + data[2:8] + (authority,) = struct.unpack(">Q", authority_raw) + assert len(data) == 8 + 4 * count_sub_auths + sub_auths = struct.unpack_from(f"< {count_sub_auths}I", data, 8) + return f"S-{revision}-{authority}" + "".join(f"-{auth}" for auth in sub_auths) diff --git a/src/ldaptool/decode/_types/timestamp.py b/src/ldaptool/decode/_types/timestamp.py new file mode 100644 index 0000000..b6c291c --- /dev/null +++ b/src/ldaptool/decode/_types/timestamp.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import datetime + +LDAP_EPOCH = datetime.datetime(year=1601, month=1, day=1, tzinfo=datetime.timezone.utc) + + +def from_ldap_date(num_value: int) -> datetime.datetime: + secs_since_1601 = int(num_value) / 1e7 # original in 100nsec + return LDAP_EPOCH + datetime.timedelta(seconds=secs_since_1601) + + +def to_ldap_date(stamp: datetime.datetime) -> int: + secs_since_1601 = (stamp - LDAP_EPOCH).total_seconds() + return int(secs_since_1601 * 1e7) # in 100nsec + + +LDAP_DATE_MIN = to_ldap_date(datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)) +LDAP_DATE_MAX = to_ldap_date(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc)) + + +def parse(value: str) -> datetime.datetime: + num_value = int(value) + if num_value >= LDAP_DATE_MAX: + return datetime.datetime.max + elif num_value <= LDAP_DATE_MIN: + return datetime.datetime.min + return from_ldap_date(num_value) diff --git a/src/ldaptool/decode/_types/uac.py b/src/ldaptool/decode/_types/uac.py new file mode 100644 index 0000000..aea50ca --- /dev/null +++ b/src/ldaptool/decode/_types/uac.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +import enum +import typing + + +class UserAccountControlFlags(enum.IntFlag): + SCRIPT = 0x0001 + ACCOUNTDISABLE = 0x0002 + HOMEDIR_REQUIRED = 0x0008 + LOCKOUT = 0x0010 + PASSWD_NOTREQD = 0x0020 + PASSWD_CANT_CHANGE = 0x0040 + ENCRYPTED_TEXT_PWD_ALLOWED = 0x0080 + TEMP_DUPLICATE_ACCOUNT = 0x0100 + NORMAL_ACCOUNT = 0x0200 + INTERDOMAIN_TRUST_ACCOUNT = 0x0800 + WORKSTATION_TRUST_ACCOUNT = 0x1000 + SERVER_TRUST_ACCOUNT = 0x2000 + DONT_EXPIRE_PASSWORD = 0x10000 + MNS_LOGON_ACCOUNT = 0x20000 + SMARTCARD_REQUIRED = 0x40000 + TRUSTED_FOR_DELEGATION = 0x80000 + NOT_DELEGATED = 0x100000 + USE_DES_KEY_ONLY = 0x200000 + DONT_REQ_PREAUTH = 0x400000 + PASSWORD_EXPIRED = 0x800000 + TRUSTED_TO_AUTH_FOR_DELEGATION = 0x1000000 + PARTIAL_SECRETS_ACCOUNT = 0x04000000 + + def flags(self) -> list[UserAccountControlFlags]: + # ignore "uncovered" bits for now + value = self.value + members = [] + for member in UserAccountControlFlags: + member_value = member.value + if member_value and member_value & value == member_value: + members.append(member) + return members + + +def parse(value: str) -> str: + members = UserAccountControlFlags(int(value)).flags() + return ", ".join(typing.cast(str, member.name) for member in members) diff --git a/src/ldaptool/decode/arguments.py b/src/ldaptool/decode/arguments.py new file mode 100644 index 0000000..f1e44aa --- /dev/null +++ b/src/ldaptool/decode/arguments.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import dataclasses + +from ldaptool._utils import argclasses + + +@dataclasses.dataclass(slots=True, kw_only=True) +class Arguments(argclasses.BaseArguments): + json: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Use full json output"), + ) + human: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"), + ) + human_separator: str = dataclasses.field( + default=", ", + metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), + ) + dateonly: bool = dataclasses.field( + default=True, + metadata=argclasses.arg(help="Use only date part of decoded timestamps"), + ) + dndomain: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"), + ) + dnpath: bool = dataclasses.field( + default=False, + metadata=argclasses.arg( + help=""" + Whether to export a virtual dnpath attribute + ('/' joined values of reversed DN without DNS labels) + """ + ), + ) + dnfullpath: bool = dataclasses.field( + default=False, + metadata=argclasses.arg( + help=""" + Whether to export a virtual dnfullpath attribute + ('/' joined values of reversed DN; DNS domain as first label) + """ + ), + ) diff --git a/src/ldaptool/search/__init__.py b/src/ldaptool/search/__init__.py new file mode 100644 index 0000000..cda684a --- /dev/null +++ b/src/ldaptool/search/__init__.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from ._search import search +from .arguments import Arguments +from .config import Config + +__all__ = [ + "Arguments", + "Config", + "search", +] diff --git a/src/ldaptool/search/_search.py b/src/ldaptool/search/_search.py new file mode 100644 index 0000000..5a423b1 --- /dev/null +++ b/src/ldaptool/search/_search.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import typing + +import ldap + +from ldaptool._utils.ldap import Result, ldap_search_ext + +from .arguments import Arguments +from .config import Config + + +def search(*, config: Config, arguments: Arguments) -> typing.Iterable[Result]: + if not arguments.realm in config.realms: + raise SystemExit(f"Unknown realm {arguments.realm}") + realm = config.realms[arguments.realm] + + ## fixup arguments base on config/realm + if realm.account is None: + arguments.krb = True + if not arguments.base: + arguments.base = realm.default_base(gc=arguments.gc) + + ldap_con = ldap.initialize(realm.ldap_uri(gc=arguments.gc, tls=False, server=arguments.server)) + ldap_con.set_option(ldap.OPT_REFERRALS, 0) + if arguments.krb: + ldap_con.sasl_gssapi_bind_s() + else: + ldap_con.simple_bind_s(realm.account, config.get_password(realm)) + + assert arguments.base + assert arguments.filter + + return ldap_search_ext( + ldap_con, + base=arguments.base, + filterstr=arguments.filter, + attrlist=arguments.attributes, + sizelimit=0 if arguments.all else 1000, + ) diff --git a/src/ldaptool/search/arguments.py b/src/ldaptool/search/arguments.py new file mode 100644 index 0000000..3445cca --- /dev/null +++ b/src/ldaptool/search/arguments.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import argparse +import dataclasses +import typing + +import ldaptool.decode.arguments +from ldaptool._utils import argclasses + + +def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None: + parser.add_argument( + metavar="attributes", + dest=dest, + nargs="*", + help=""" + Attributes to lookup (and columns to display in tables). + Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn). + """, + ) + + +@dataclasses.dataclass(slots=True, kw_only=True) +class Arguments(ldaptool.decode.arguments.Arguments): + # overwrite fields for fake attributes to remove them from argparse; + # we enable those based on the attribute list + dndomain: bool = False + dnpath: bool = False + dnfullpath: bool = False + + attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes)) + columns: list[str] = dataclasses.field(default_factory=list) + filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter")) + find: typing.Optional[str] = dataclasses.field( + default=None, + metadata=argclasses.arg(help="Account/Name/Email to search for (builds filter around it)"), + ) + # TODO: not calling ldapsearch anymore... + # debug: bool = dataclasses.field( + # default=False, + # metadata=argclasses.arg("-d", help="Show arguments to ldapsearch"), + # ) + gc: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"), + ) + raw: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="Don't pipe output through ldap-decode"), + ) + realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in")) + server: typing.Optional[str] = dataclasses.field( + default=None, + metadata=argclasses.arg( + help=""" + Server of realm to connect to + (attributes like lastLogon are not replicated and can vary between servers) + """, + ), + ) + all: bool = dataclasses.field( + default=False, + metadata=argclasses.arg( + help="Get all results (pagination) instead of only first 1000", + ), + ) + krb: bool = dataclasses.field( + default=False, + metadata=argclasses.arg( + help="Use kerberos authentication (ticket must be already present)", + ), + ) + base: typing.Optional[str] = dataclasses.field( + default=None, + metadata=argclasses.arg( + "-b", + help="Explicit search base (defaults to root of domain / forest with --gc)", + ), + ) + csv: bool = dataclasses.field( + default=False, + metadata=argclasses.arg(help="CSV output - requires list of attributes"), + ) + table: bool = dataclasses.field( + default=False, + metadata=argclasses.arg( + help="Markdown table output - requires list of attributes", + ), + ) + sort: bool = dataclasses.field( + default=False, + metadata=argclasses.arg( + help="Sorted table output - defaults to markdown --table unless --csv is given", + ), + ) + + def __post_init__(self) -> None: + if not self.filter is None: + if not self.find is None: + raise SystemExit("Can't use both --find and --filter") + elif not self.find is None: + find = self.find + self.filter = ( + f"(|(sAMAccountName={find})(email={find})(mail={find})(proxyAddresses=smtp:{find})(description={find}))" + ) + else: + # probably doesn't like empty filter? + self.filter = "(objectClass=*)" + + # can't print both csv and markdown + if self.csv and self.table: + raise SystemExit("Can't use both --table and --csv") + + if self.sort: + if not self.table and not self.csv: + # default to markdown table + self.table = True + + if self.table: + # markdown requires underlying csv + self.csv = True + + # extract special attribute names + self.columns = self.attributes # use all names for columns (headings and their order) + attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name + # create fake attributes on demand + if attributes_set.pop("dndomain", ""): + self.dndomain = True + if attributes_set.pop("dnpath", ""): + self.dnpath = True + if attributes_set.pop("dnfullpath", ""): + self.dnfullpath = True + # store remaining attributes (with original case) + self.attributes = list(attributes_set.values()) + if self.columns and not self.attributes: + # if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes + self.attributes = ["dn"] + + if self.csv: + if not self.columns: + raise SystemExit("Table output requires attributes") + if self.json: + raise SystemExit("Can't use both --table / --csv / --sort and --json") + if self.human: + raise SystemExit("Can't use both --table / --csv / --sort and --human") + + if self.raw: + if self.csv: + raise SystemExit("Table output requires decode; --raw not allowed") + if self.json or self.human: + raise SystemExit("Decode options require decode; --raw not allowed") diff --git a/src/ldaptool/search/config.py b/src/ldaptool/search/config.py new file mode 100644 index 0000000..de32d8f --- /dev/null +++ b/src/ldaptool/search/config.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import abc +import dataclasses +import os +import os.path +import shlex +import subprocess +import typing + +import yaml + +from ldaptool._utils.dninfo import DNInfo + + +@dataclasses.dataclass +class Realm: + # yaml entry key: + name: str + # yaml fields: + domain: str + servers: list[str] # space separated in yaml + forest_root_domain: str # defaults to domain + account: typing.Optional[str] = None # DN or userPrincipalName + password_file: typing.Optional[str] = None # password file (default: dervied from account) + password_folder: typing.Optional[str] = None # subfolder in password manager + + @staticmethod + def load(name: str, data: typing.Any) -> Realm: + assert isinstance(data, dict) + domain = data.pop("domain") + servers = data.pop("servers").split() + forest_root_domain = data.pop("forest_root_domain", domain) + account = data.pop("account", None) + password_file = data.pop("password_file", None) + password_folder = data.pop("password_folder", None) + return Realm( + name=name, + domain=domain, + servers=servers, + forest_root_domain=forest_root_domain, + account=account, + password_file=password_file, + password_folder=password_folder, + ) + + def ldap_uri(self, *, gc: bool, tls: bool, server: typing.Optional[str] = None) -> str: + scheme = "ldaps" if tls else "ldap" + port = (":3269" if tls else ":3268") if gc else "" # default ports unless gc + if not server is None: + if not server in self.servers: + raise SystemExit(f"Server {server!r} not listed for realm {self.name}") + servers = [server] + else: + servers = self.servers + return " ".join(f"{scheme}://{server}.{self.domain}{port}" for server in servers) + + def default_base(self, *, gc: bool) -> str: + domain = self.forest_root_domain if gc else self.domain + return ",".join(f"DC={label}" for label in domain.split(".")) + + @property + def password_name(self) -> str: + """ + Name of password file for the account. + + If password_file wasn't set, it is derived from account: + + If account is using the "email address" format (userPrincipalName), + the password file is the local part. + Otherwise it is assumed to be a DN and a full path is extracted from it: + CN=Bob,OU=SomeDepartment,DC=example,DC=com + becomes: + example.com/SomeDepartment/Bob + + If a password_folder was specified, the file is search within it. + """ + if self.account is None: + raise ValueError("Require account name to lookup password") + if not self.password_file is None: + secretname = self.password_file + elif "@" in self.account: + secretname = self.account.split("@", maxsplit=1)[0] + else: + secretname = DNInfo(dn=self.account).full_path + return os.path.join(self.password_folder or "", secretname) + + +class PasswordManager(abc.ABC): + @abc.abstractmethod + def get_password(self, password_name: str) -> str: + raise NotImplementedError() + + +@dataclasses.dataclass +class Keyringer(PasswordManager): + keyring: str + folder: str + + @staticmethod + def load(data: typing.Any) -> Keyringer: + assert isinstance(data, dict) + keyring = data.pop("keyring") + folder = data.pop("folder") + return Keyringer(keyring=keyring, folder=folder) + + def get_password(self, password_name: str) -> str: + secretname = os.path.join(self.folder, password_name) + result = subprocess.run( + [ + "keyringer", + self.keyring, + "decrypt", + secretname, + ], + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + encoding="utf-8", + ) + return result.stdout.strip() + + +@dataclasses.dataclass +class PasswordScript(PasswordManager): + command: list[str] + + @staticmethod + def load(data: typing.Any) -> PasswordScript: + if isinstance(data, str): + return PasswordScript(command=shlex.split(data)) + elif isinstance(data, list): + for elem in data: + assert isinstance(elem, str) + return PasswordScript(command=data) + raise ValueError("password-script either takes string or list of strings") + + def get_password(self, password_name: str) -> str: + result = subprocess.run( + self.command + [password_name], + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + encoding="utf-8", + ) + return result.stdout.strip() + + +@dataclasses.dataclass +class Config: + password_manager: typing.Optional[PasswordManager] = None + realms: dict[str, Realm] = dataclasses.field(default_factory=dict) + + @staticmethod + def load() -> Config: + conf_path = os.path.expanduser("~/.config/ldaptool.yaml") + if not os.path.exists(conf_path): + raise SystemExit(f"Missing config file {conf_path}") + with open(conf_path) as f: + data = yaml.safe_load(f) + assert isinstance(data, dict) + assert "realms" in data + realms_data = data.pop("realms") + assert isinstance(realms_data, dict) + realms = {} + for name, realm_data in realms_data.items(): + realms[name] = Realm.load(name, realm_data) + + password_manager: typing.Optional[PasswordManager] = None + if "keyringer" in data: + if password_manager: + raise ValueError("Can only set a single password manager") + password_manager = Keyringer.load(data.pop("keyringer")) + if "password-script" in data: + if password_manager: + raise ValueError("Can only set a single password manager") + password_manager = PasswordScript.load(data.pop("password-script")) + + return Config(realms=realms, password_manager=password_manager) + + def get_password(self, realm: Realm) -> str: + """ + Return password if password manager is configured. + Could support other tools as well here. + """ + if realm.account is None: + raise RuntimeError("Can't get password without acccount - should use kerberos instead") + if self.password_manager: + return self.password_manager.get_password(realm.password_name) + import getpass + + return getpass.getpass(f"Enter password for {realm.password_name}: ")