This commit is contained in:
Stefan Bühler 2023-04-28 02:45:32 +02:00
commit 1f28ee3622
24 changed files with 1361 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__

9
.pycodestyle Normal file
View File

@ -0,0 +1,9 @@
[pycodestyle]
# E241 multiple spaces after ':' [ want to align stuff ]
# E266 too many leading '#' for block comment [ I like marking disabled code blocks with '### ' ]
# E701 multiple statements on one line (colon) [ perfectly readable ]
# E713 test for membership should be not in [ disagree: want `not a in x` ]
# E714 test for object identity should be 'is not' [ disagree: want `not a is x` ]
# W503 Line break occurred before a binary operator [ pep8 flipped on this (also contradicts W504) ]
ignore = E241,E266,E701,E713,E714,W503
max-line-length = 120

22
LICENSE Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2023 Stefan Bühler (University of Stuttgart)
Copyright (c) 2023 Daniel Dizdarevic (University of Stuttgart)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

90
README.md Normal file
View File

@ -0,0 +1,90 @@
# ldaptool
CLI tool to query LDAP/AD servers
* Configuration file to configure "realms"
* DNS domain (mapping to ldap search base as DC labels)
* LDAP servers in that domain
* Bind account
* Integration with password managers
* Various output formats
* Classic LDIF
* JSON stream (with detailed or simplified attribute values)
* CSV
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts)
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
* By default the first 1000 entries are shown, and it errors if there are more results
* Use `-all` to show all results
## Authentication, Protocol, Ports
`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones.
## Config file
Location: `~/.config/ldaptool.yaml`
### Realms
```yaml
realms:
EXAMPLE:
domain: "example.com"
servers: server1 server2
account: "bind@example.com"
password_folder: mainaccounts
EXAMPLE.admin:
domain: "example.com"
servers: server1 server2
account: "CN=admin,OU=Admins,DC=example,DC=com"
password_folder: adminaccounts
EXAMPLE.admin2:
domain: "example.com"
servers: server1 server2
account: "CN=admin,OU=Admins,DC=example,DC=com"
password_file: localadmin2
password_folder: adminaccounts
SUB:
domain: "sub.example.com"
servers: subserver1 subserver2
forest_root_domain: "example.com"
```
The `servers` field is a whitespace separates list of hostnames in the domain.
If a password manager is used, the `password_file` (defaults to names derived from `account`) and `password_folder` fields determine the name of the file ("secret") queried from the password manager. Here the following file names would be used:
* `EXAMPLE`: `mainaccounts/bind`
* `EXAMPLE.admin`: `adminaccounts/example.com/Admins/admin`
* `EXAMPLE.admin2`: `adminaccounts/localadmin2`
If the `account` field isn't present `ldaptool` always uses kerberos; if `--krb` is used, `account` is ignored.
Windows AD has a concept of a "global catalog" across all domains in a AD Forest; it uses separate ports (3268 without TLS and 3269 with TLS).
The `forest_root_domain` field can be used to set a search base for global catalog (`--gc`) queries (usually the forest root should be parent domain).
Unless specified with `--base` the search base is derived from `domain` (or `forest_root_domain` with `--gc`) as `DC=...` for each DNS label.
#### Script as password manager
```yaml
password-script: keyring local decrypt
```
This configures a script as password manager.
Either takes a string (split by [`shlex.split`](https://docs.python.org/3/library/shlex.html#shlex.split)) or a list of strings.
The password name is appended as last argument.
#### keyringer
```yaml
keyringer:
keyring: yourkeyringname
folder: ldapquery
```
This configures [`keyringer`](https://0xacab.org/rhatto/keyringer) (based on GPG) as password manager.
`keyringer` need a "keyring" to search in, and you can (optionally) specify a folder to be
prefixed to the password names created from the realm.

7
fmt.sh Executable file
View File

@ -0,0 +1,7 @@
#!/bin/sh
self=$(dirname "$(readlink -f "$0")")
cd "${self}"
python3 -m black src
python3 -m isort src

30
lints.sh Executable file
View File

@ -0,0 +1,30 @@
#!/bin/bash
set -e
cd "$(dirname "$(readlink "$0")")"
sources=($@)
if [ "${#sources[@]}" -eq 0 ]; then
sources=(src)
fi
rc=0
run() {
# remember last failure
if "$@"; then :; else rc=$?; fi
}
echo "[pycodestyle]"
run pycodestyle --config=.pycodestyle "${sources[@]}"
echo "[pyflakes]"
run python3 -m pyflakes "${sources[@]}"
echo "[mypy]"
run mypy "${sources[@]}"
echo "[black]"
run python3 -m black --check "${sources[@]}"
echo "[isort]"
run python3 -m isort --check-only "${sources[@]}"
exit $rc

50
pyproject.toml Normal file
View File

@ -0,0 +1,50 @@
[build-system]
requires = ["flit_core >=3.2,<4"]
build-backend = "flit_core.buildapi"
[project]
name = "ldaptool"
authors = [
{name = "Stefan Bühler", email = "stefan.buehler@tik.uni-stuttgart.de"},
{name = "Daniel Dizdarevic", email = "daniel.dizdarevic@tik.uni-stuttgart.de"},
]
readme = "README.md"
license = {file = "LICENSE"}
classifiers = [
"Private :: Do Not Upload",
"License :: OSI Approved :: MIT License",
]
dynamic = ["version", "description"]
requires-python = "~=3.11"
dependencies = [
"python-ldap",
"PyYAML",
]
[project.scripts]
ldaptool = "ldaptool._main:main"
[project.urls]
# Documentation = "..."
Source = "https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool"
[tool.black]
line-length = 120
[tool.mypy]
disallow_any_generics = true
disallow_untyped_defs = true
warn_redundant_casts = true
warn_return_any = true
warn_unused_configs = true
warn_unused_ignores = true
warn_unreachable = true
[[tool.mypy.overrides]]
module = [
"ldap",
"ldap.dn",
"ldap.controls.libldap",
]
ignore_missing_imports = true

5
src/ldaptool/__init__.py Normal file
View File

@ -0,0 +1,5 @@
""" CLI ldapsearch tool with json and table output """
from __future__ import annotations
__version__ = "0.1"

115
src/ldaptool/_main.py Normal file
View File

@ -0,0 +1,115 @@
from __future__ import annotations
import argparse
import csv
import subprocess
import sys
import typing
from ldaptool import decode, search
from ldaptool._utils.ldap import Result, SizeLimitExceeded
class _Context:
def __init__(self) -> None:
parser = argparse.ArgumentParser()
arguments_p = search.Arguments.add_to_parser(parser)
args = parser.parse_args()
try:
self.config = search.Config.load()
except Exception as e:
raise SystemExit(f"config error: {e}")
self.arguments = arguments_p.from_args(args)
def run(self) -> None:
# starting the search sets the base we want to print
search_iterator = search.search(config=self.config, arguments=self.arguments)
self._run_with_filters(search_iterator)
def _run_with_filters(self, search_iterator: typing.Iterable[Result]) -> None:
output: typing.IO[str] = sys.stdout
procs: list[subprocess.Popen[str]] = []
def add_filter(cmd: list[str]) -> None:
nonlocal output
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=output, text=True)
procs.append(proc)
if output != sys.stdout:
output.close()
assert proc.stdin
output = proc.stdin
try:
if self.arguments.table:
add_filter(["csvlook"])
if self.arguments.sort:
add_filter(["csvsort", "--blanks"])
self._run_search(search_iterator, stream=output)
finally:
if procs:
output.close()
for proc in reversed(procs):
proc.wait()
def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
decoder = decode.Decoder(arguments=self.arguments)
num_responses = 0
num_entries = 0
ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human)
if ldif_output:
print("# extended LDIF")
print("#")
print("# LDAPv3")
print(f"# base <{self.arguments.base}> with scope subtree")
print(f"# filter: {self.arguments.filter}")
if self.arguments.attributes:
print(f"# requesting: {' '.join(self.arguments.attributes)}")
else:
print("# requesting: ALL")
print("#")
print()
if self.arguments.csv:
csv_out = csv.DictWriter(
stream,
fieldnames=self.arguments.columns,
lineterminator="\n",
extrasaction="ignore",
)
csv_out.writeheader()
# dicts contain data by lower case key
csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
try:
for dn, entry in search_iterator:
num_responses += 1
if dn is None:
if not self.arguments.csv:
print("# search reference")
for ref in entry:
assert isinstance(ref, str)
print(f"ref: {ref}")
print()
continue
# normal entry
assert not isinstance(entry, list)
num_entries += 1
obj = decoder.read(dn=dn, entry=entry)
if self.arguments.csv:
csv_out.writerow(decoder.human(dn=dn, entry=obj))
else:
decoder.emit(dn=dn, entry=obj)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
if ldif_output:
print(f"# numResponses: {num_responses}")
print(f"# numEntries: {num_entries}")
def main() -> None:
ctx = _Context()
ctx.run()

View File

@ -0,0 +1 @@
from __future__ import annotations

View File

@ -0,0 +1,120 @@
from __future__ import annotations
import abc
import argparse
import dataclasses
import typing
class _BaseArgumentDefinition(abc.ABC):
__slots__ = ()
@abc.abstractmethod
def add_argument(self, *, parser: argparse.ArgumentParser, field: dataclasses.Field[typing.Any], dest: str) -> None:
raise NotImplementedError()
@dataclasses.dataclass(slots=True, kw_only=True)
class _ArgumentDefinition(_BaseArgumentDefinition):
flags: tuple[str, ...] = ()
required: bool = False
help: str
def add_argument(self, *, parser: argparse.ArgumentParser, field: dataclasses.Field[typing.Any], dest: str) -> None:
if field.type == "bool":
parser.add_argument(
f"--{field.name}",
*self.flags,
default=field.default,
dest=dest,
action=argparse.BooleanOptionalAction,
help=f"{self.help} (default: %(default)s)",
)
elif field.type.startswith("list["):
parser.add_argument(
f"--{field.name}",
*self.flags,
required=self.required,
# not passing default (nor default_factor).
# if argument isn't used, it will be None, and the
# dataclass default is triggered
dest=dest,
action="append",
help=f"{self.help}",
)
else:
parser.add_argument(
f"--{field.name}",
*self.flags,
required=self.required,
default=field.default,
dest=dest,
help=f"{self.help}",
)
def arg(*flags: str, required: bool = False, help: str) -> dict[typing.Any, typing.Any]:
return {id(_BaseArgumentDefinition): _ArgumentDefinition(flags=flags, required=required, help=help)}
@dataclasses.dataclass(slots=True, kw_only=True)
class _ManualArgumentDefinition(_BaseArgumentDefinition):
callback: typing.Callable[[argparse.ArgumentParser, str], None]
def add_argument(self, *, parser: argparse.ArgumentParser, field: dataclasses.Field[typing.Any], dest: str) -> None:
self.callback(parser, dest)
def manual(callback: typing.Callable[[argparse.ArgumentParser, str], None]) -> dict[typing.Any, typing.Any]:
return {id(_BaseArgumentDefinition): _ManualArgumentDefinition(callback=callback)}
_TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
@dataclasses.dataclass(slots=True, kw_only=True)
class BaseArguments:
@classmethod
def add_fields_to_parser(
cls: type[_TArgs],
parser: argparse.ArgumentParser,
*,
prefix: str = "",
) -> None:
for field in dataclasses.fields(cls):
argdef = field.metadata.get(id(_BaseArgumentDefinition), None)
if argdef is None:
continue
assert isinstance(argdef, _BaseArgumentDefinition)
dest = f"{prefix}{field.name}"
argdef.add_argument(parser=parser, field=field, dest=dest)
@classmethod
def add_to_parser(
cls: type[_TArgs],
parser: argparse.ArgumentParser,
*,
prefix: str = "",
) -> ArgumentsParser[_TArgs]:
cls.add_fields_to_parser(parser, prefix=prefix)
return ArgumentsParser(cls=cls, prefix=prefix)
@dataclasses.dataclass(slots=True, kw_only=True)
class ArgumentsParser(typing.Generic[_TArgs]):
cls: type[_TArgs]
prefix: str
def get_fields(self, args: argparse.Namespace) -> dict[str, typing.Any]:
data = {}
for field in dataclasses.fields(self.cls):
argdef = field.metadata.get(id(_BaseArgumentDefinition), None)
if argdef is None:
continue
value = getattr(args, f"{self.prefix}{field.name}")
if not value is None:
data[field.name] = value
return data
def from_args(self, args: argparse.Namespace) -> _TArgs:
return self.cls(**self.get_fields(args))

View File

@ -0,0 +1,51 @@
from __future__ import annotations
import dataclasses
import functools
import re
import typing
import ldap
import ldap.dn
def _escape_backslash(value: str, *, special: str) -> str:
# escape backslash itself first
value = value.replace("\\", "\\\\")
# escape newlines and NULs with special escape sequences
value = value.replace("\n", "\\n").replace("\r", "\\r").replace("\0", "\\0")
# escape "specials" by prefixing them with backslash
pattern_class = re.escape(special)
return re.sub(f"([{pattern_class}])", r"\\\1", value)
@dataclasses.dataclass(frozen=True)
class DNInfo:
dn: str
parts: list[list[tuple[str, str, int]]] # list of list of (la_attr, la_value, la_flags)
def __init__(self, *, dn: str) -> None:
parts = ldap.dn.str2dn(dn, flags=ldap.DN_FORMAT_LDAPV3)
object.__setattr__(self, "dn", dn)
object.__setattr__(self, "parts", parts)
@functools.cached_property
def domain(self) -> str:
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str:
return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc")
@functools.cached_property
def path(self) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/")
@property
def full_path(self) -> str:
domain = self.domain
path = self.path
if not path:
return self.domain
if not domain:
return self.path
return f"{domain}/{path}"

View File

@ -0,0 +1,83 @@
from __future__ import annotations
import typing
import ldap
class SizeLimitExceeded(Exception):
pass
Entry = tuple[str, dict[str, list[bytes]]]
Ref = tuple[None, list[str]]
Result = Entry | Ref
Results = list[Result]
def ldap_search_ext(
ldap_con: ldap.ldapobject.LDAPObject,
base: str,
filterstr: str = "(objectClass=*)",
*,
scope: int = ldap.SCOPE_SUBTREE,
attrlist: typing.Optional[typing.Sequence[str]] = None,
pagelimit: int = 5000,
sizelimit: int = 0,
serverctrls: list[ldap.controls.RequestControl] = [],
**kwargs: typing.Any,
) -> typing.Iterable[Result]:
"""
Retrieve all results through pagination
"""
from ldap.controls.libldap import SimplePagedResultsControl
page_ctrl = SimplePagedResultsControl(criticality=True, size=pagelimit, cookie=b"")
serverctrls = [page_ctrl] + serverctrls
def try_get_page() -> tuple[Results, list[ldap.controls.ResponseControl]]:
response = ldap_con.search_ext(
base=base,
scope=scope,
filterstr=filterstr,
attrlist=attrlist,
serverctrls=serverctrls,
**kwargs,
)
_rtype, results, _rmsgid, resp_controls = ldap_con.result3(response)
# print(f"Ldap search got page: rtype: {_rtype}, results: {len(results)} msgid: {_rmsgid}")
return results, resp_controls
def get_page() -> tuple[Results, list[ldap.controls.ResponseControl]]:
if isinstance(ldap_con, ldap.ldapobject.ReconnectLDAPObject):
# ReconnectLDAPObject doesn't wrap search_ext / provide search_ext + result3
return ldap_con._apply_method_s(lambda con: try_get_page()) # type: ignore
else:
return try_get_page()
num_results = 0
while True:
if sizelimit:
# don't get more than 1 result more than we are interested in anyway
page_ctrl.size = min(pagelimit, sizelimit - num_results + 1)
results, resp_controls = get_page()
resp_page_controls = [
control for control in resp_controls if control.controlType == SimplePagedResultsControl.controlType
]
assert resp_page_controls, "The server ignores RFC 2696 control"
# forward results from this page
for result in results:
if not result[0] is None:
# don't count refs
if sizelimit and num_results >= sizelimit:
raise SizeLimitExceeded(f"More than {sizelimit} results")
num_results += 1
yield result
# update cookie for next page
if not resp_page_controls[0].cookie:
# was last page, done
break
page_ctrl.cookie = resp_page_controls[0].cookie

View File

@ -0,0 +1,10 @@
from __future__ import annotations
from ._decoder import Attribute, Decoder
from .arguments import Arguments
__all__ = [
"Arguments",
"Attribute",
"Decoder",
]

View File

@ -0,0 +1,229 @@
from __future__ import annotations
import base64
import dataclasses
import json
import re
import sys
import typing
import uuid
from ldaptool._utils.dninfo import DNInfo
from . import _types
from .arguments import Arguments
TEntry = dict[str, list[bytes]]
TDecoded = dict[str, list["Attribute"]]
CTRL = re.compile(r"[\x00-\x19]")
@dataclasses.dataclass(slots=True, kw_only=True)
class Attribute:
name: str
raw: bytes
utf8_clean: typing.Optional[str]
decoded: typing.Optional[str]
@typing.overload
def __init__(
self,
*,
name: str,
raw: bytes,
arguments: Arguments,
) -> None:
...
@typing.overload
def __init__(
self,
*,
name: str,
raw: bytes,
_utf8_clean: str,
) -> None:
...
def __init__(
self,
*,
name: str,
raw: bytes,
arguments: typing.Optional[Arguments] = None,
_utf8_clean: typing.Optional[str] = None,
) -> None:
self.name = name
self.raw = raw
self.utf8_clean = None
self.decoded = None
if not _utf8_clean is None:
# building fake attribute; no decoding
self.utf8_clean = _utf8_clean
return
assert arguments, "Need arguments for proper decoding"
try:
utf8_clean = raw.decode()
if not CTRL.search(utf8_clean):
self.utf8_clean = utf8_clean
except Exception:
# UTF-8 decode error
pass
self._try_decode(arguments)
def _try_decode_sid(self) -> None:
try:
self.decoded = _types.sid.parse_raw(self.raw)
except Exception:
return
def _try_decode_uuid(self) -> None:
try:
self.decoded = str(uuid.UUID(bytes=self.raw))
except Exception:
return
def _try_decode_timestamp(self, args: Arguments) -> None:
if self.utf8_clean:
try:
date = _types.timestamp.parse(self.utf8_clean)
except Exception:
return
if args.dateonly:
self.decoded = str(date.date())
else:
self.decoded = str(date)
def _try_decode_uac(self) -> None:
if self.utf8_clean:
try:
self.decoded = _types.uac.parse(self.utf8_clean.strip())
except Exception:
return
def _try_decode(self, args: Arguments) -> None:
if self.name in ("objectSid",):
self._try_decode_sid()
elif self.name in ("msExchMailboxGuid", "objectGUID"):
self._try_decode_uuid()
elif self.name in (
"pwdLastSet",
"lastLogon", # DC local attribute, not synced
"lastLogonTimestamp", # set and synced across DCs if "more fresh" than msDS-LogonTimeSyncInterval
"badPasswordTime",
"accountExpires",
):
self._try_decode_timestamp(args)
elif self.name == "userAccountControl":
self._try_decode_uac()
@property
def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii")
def print(self) -> None:
if not self.decoded is None:
comment = self.utf8_clean
if comment is None:
comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}")
elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}")
else:
print(f"{self.name}:: {self._base64_value}")
def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {}
b64_value = self._base64_value
item["binary"] = b64_value
if not self.utf8_clean is None:
item["ldif_value"] = self.utf8_clean
if not self.decoded is None:
item["human"] = self.decoded
elif not self.utf8_clean is None:
item["human"] = self.utf8_clean
else:
item["human"] = self._base64_value
item["human_is_base64"] = True
return item
def human(self) -> str:
if not self.decoded is None:
return self.decoded
elif not self.utf8_clean is None:
return self.utf8_clean
else:
return self._base64_value
@staticmethod
def fake_attribute(name: str, value: str) -> Attribute:
return Attribute(
name=name,
raw=value.encode(),
_utf8_clean=value,
)
@dataclasses.dataclass(slots=True, kw_only=True)
class Decoder:
arguments: Arguments
def read(self, *, dn: str, entry: TEntry) -> dict[str, list[Attribute]]:
# lowercase attribute name in decoded dict. attribute itself still knows original for LDIF output.
decoded_entry = {
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items()
}
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
dninfo = DNInfo(dn=dn)
if self.arguments.dndomain:
decoded_entry["dndomain"] = [
Attribute.fake_attribute("dndomain", dninfo.domain),
]
if self.arguments.dnpath:
decoded_entry["dnpath"] = [
Attribute.fake_attribute("dnpath", dninfo.path),
]
if self.arguments.dnfullpath:
decoded_entry["dnfullpath"] = [
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
]
return decoded_entry
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items():
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items():
emit[name] = [attr.to_json() for attr in attrs]
return emit
def _emit_json(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human:
emit = self.human(dn=dn, entry=entry)
else:
emit = self.json(dn=dn, entry=entry)
json.dump(emit, sys.stdout, ensure_ascii=False)
print() # terminate output dicts by newline
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None:
print(f"dn: {dn}")
for attrs in entry.values():
for attr in attrs:
attr.print()
print() # separate entries with newlines
def emit(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human or self.arguments.json:
self._emit_json(dn=dn, entry=entry)
else:
self._emit_ldif(dn=dn, entry=entry)
def handle(self, *, dn: str, entry: TEntry) -> None:
entry_attrs = self.read(dn=dn, entry=entry)
self.emit(dn=dn, entry=entry_attrs)

View File

@ -0,0 +1,9 @@
from __future__ import annotations
from . import sid, timestamp, uac
__all__ = [
"sid",
"timestamp",
"uac",
]

View File

@ -0,0 +1,14 @@
from __future__ import annotations
import struct
def parse_raw(data: bytes) -> str:
revision = data[0]
count_sub_auths = data[1]
# clear first two bytes for 64-bit decoding
authority_raw = b"\x00\x00" + data[2:8]
(authority,) = struct.unpack(">Q", authority_raw)
assert len(data) == 8 + 4 * count_sub_auths
sub_auths = struct.unpack_from(f"< {count_sub_auths}I", data, 8)
return f"S-{revision}-{authority}" + "".join(f"-{auth}" for auth in sub_auths)

View File

@ -0,0 +1,28 @@
from __future__ import annotations
import datetime
LDAP_EPOCH = datetime.datetime(year=1601, month=1, day=1, tzinfo=datetime.timezone.utc)
def from_ldap_date(num_value: int) -> datetime.datetime:
secs_since_1601 = int(num_value) / 1e7 # original in 100nsec
return LDAP_EPOCH + datetime.timedelta(seconds=secs_since_1601)
def to_ldap_date(stamp: datetime.datetime) -> int:
secs_since_1601 = (stamp - LDAP_EPOCH).total_seconds()
return int(secs_since_1601 * 1e7) # in 100nsec
LDAP_DATE_MIN = to_ldap_date(datetime.datetime.min.replace(tzinfo=datetime.timezone.utc))
LDAP_DATE_MAX = to_ldap_date(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc))
def parse(value: str) -> datetime.datetime:
num_value = int(value)
if num_value >= LDAP_DATE_MAX:
return datetime.datetime.max
elif num_value <= LDAP_DATE_MIN:
return datetime.datetime.min
return from_ldap_date(num_value)

View File

@ -0,0 +1,44 @@
from __future__ import annotations
import enum
import typing
class UserAccountControlFlags(enum.IntFlag):
SCRIPT = 0x0001
ACCOUNTDISABLE = 0x0002
HOMEDIR_REQUIRED = 0x0008
LOCKOUT = 0x0010
PASSWD_NOTREQD = 0x0020
PASSWD_CANT_CHANGE = 0x0040
ENCRYPTED_TEXT_PWD_ALLOWED = 0x0080
TEMP_DUPLICATE_ACCOUNT = 0x0100
NORMAL_ACCOUNT = 0x0200
INTERDOMAIN_TRUST_ACCOUNT = 0x0800
WORKSTATION_TRUST_ACCOUNT = 0x1000
SERVER_TRUST_ACCOUNT = 0x2000
DONT_EXPIRE_PASSWORD = 0x10000
MNS_LOGON_ACCOUNT = 0x20000
SMARTCARD_REQUIRED = 0x40000
TRUSTED_FOR_DELEGATION = 0x80000
NOT_DELEGATED = 0x100000
USE_DES_KEY_ONLY = 0x200000
DONT_REQ_PREAUTH = 0x400000
PASSWORD_EXPIRED = 0x800000
TRUSTED_TO_AUTH_FOR_DELEGATION = 0x1000000
PARTIAL_SECRETS_ACCOUNT = 0x04000000
def flags(self) -> list[UserAccountControlFlags]:
# ignore "uncovered" bits for now
value = self.value
members = []
for member in UserAccountControlFlags:
member_value = member.value
if member_value and member_value & value == member_value:
members.append(member)
return members
def parse(value: str) -> str:
members = UserAccountControlFlags(int(value)).flags()
return ", ".join(typing.cast(str, member.name) for member in members)

View File

@ -0,0 +1,47 @@
from __future__ import annotations
import dataclasses
from ldaptool._utils import argclasses
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(argclasses.BaseArguments):
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use full json output"),
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
human_separator: str = dataclasses.field(
default=", ",
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
)
dateonly: bool = dataclasses.field(
default=True,
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
)
dndomain: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
)
dnpath: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="""
Whether to export a virtual dnpath attribute
('/' joined values of reversed DN without DNS labels)
"""
),
)
dnfullpath: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="""
Whether to export a virtual dnfullpath attribute
('/' joined values of reversed DN; DNS domain as first label)
"""
),
)

View File

@ -0,0 +1,11 @@
from __future__ import annotations
from ._search import search
from .arguments import Arguments
from .config import Config
__all__ = [
"Arguments",
"Config",
"search",
]

View File

@ -0,0 +1,40 @@
from __future__ import annotations
import typing
import ldap
from ldaptool._utils.ldap import Result, ldap_search_ext
from .arguments import Arguments
from .config import Config
def search(*, config: Config, arguments: Arguments) -> typing.Iterable[Result]:
if not arguments.realm in config.realms:
raise SystemExit(f"Unknown realm {arguments.realm}")
realm = config.realms[arguments.realm]
## fixup arguments base on config/realm
if realm.account is None:
arguments.krb = True
if not arguments.base:
arguments.base = realm.default_base(gc=arguments.gc)
ldap_con = ldap.initialize(realm.ldap_uri(gc=arguments.gc, tls=False, server=arguments.server))
ldap_con.set_option(ldap.OPT_REFERRALS, 0)
if arguments.krb:
ldap_con.sasl_gssapi_bind_s()
else:
ldap_con.simple_bind_s(realm.account, config.get_password(realm))
assert arguments.base
assert arguments.filter
return ldap_search_ext(
ldap_con,
base=arguments.base,
filterstr=arguments.filter,
attrlist=arguments.attributes,
sizelimit=0 if arguments.all else 1000,
)

View File

@ -0,0 +1,151 @@
from __future__ import annotations
import argparse
import dataclasses
import typing
import ldaptool.decode.arguments
from ldaptool._utils import argclasses
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(ldaptool.decode.arguments.Arguments):
# overwrite fields for fake attributes to remove them from argparse;
# we enable those based on the attribute list
dndomain: bool = False
dnpath: bool = False
dnfullpath: bool = False
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns: list[str] = dataclasses.field(default_factory=list)
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
find: typing.Optional[str] = dataclasses.field(
default=None,
metadata=argclasses.arg(help="Account/Name/Email to search for (builds filter around it)"),
)
# TODO: not calling ldapsearch anymore...
# debug: bool = dataclasses.field(
# default=False,
# metadata=argclasses.arg("-d", help="Show arguments to ldapsearch"),
# )
gc: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
)
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
server: typing.Optional[str] = dataclasses.field(
default=None,
metadata=argclasses.arg(
help="""
Server of realm to connect to
(attributes like lastLogon are not replicated and can vary between servers)
""",
),
)
all: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Get all results (pagination) instead of only first 1000",
),
)
krb: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Use kerberos authentication (ticket must be already present)",
),
)
base: typing.Optional[str] = dataclasses.field(
default=None,
metadata=argclasses.arg(
"-b",
help="Explicit search base (defaults to root of domain / forest with --gc)",
),
)
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
def __post_init__(self) -> None:
if not self.filter is None:
if not self.find is None:
raise SystemExit("Can't use both --find and --filter")
elif not self.find is None:
find = self.find
self.filter = (
f"(|(sAMAccountName={find})(email={find})(mail={find})(proxyAddresses=smtp:{find})(description={find}))"
)
else:
# probably doesn't like empty filter?
self.filter = "(objectClass=*)"
# can't print both csv and markdown
if self.csv and self.table:
raise SystemExit("Can't use both --table and --csv")
if self.sort:
if not self.table and not self.csv:
# default to markdown table
self.table = True
if self.table:
# markdown requires underlying csv
self.csv = True
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
# create fake attributes on demand
if attributes_set.pop("dndomain", ""):
self.dndomain = True
if attributes_set.pop("dnpath", ""):
self.dnpath = True
if attributes_set.pop("dnfullpath", ""):
self.dnfullpath = True
# store remaining attributes (with original case)
self.attributes = list(attributes_set.values())
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]
if self.csv:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both --table / --csv / --sort and --json")
if self.human:
raise SystemExit("Can't use both --table / --csv / --sort and --human")
if self.raw:
if self.csv:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")

View File

@ -0,0 +1,194 @@
from __future__ import annotations
import abc
import dataclasses
import os
import os.path
import shlex
import subprocess
import typing
import yaml
from ldaptool._utils.dninfo import DNInfo
@dataclasses.dataclass
class Realm:
# yaml entry key:
name: str
# yaml fields:
domain: str
servers: list[str] # space separated in yaml
forest_root_domain: str # defaults to domain
account: typing.Optional[str] = None # DN or userPrincipalName
password_file: typing.Optional[str] = None # password file (default: dervied from account)
password_folder: typing.Optional[str] = None # subfolder in password manager
@staticmethod
def load(name: str, data: typing.Any) -> Realm:
assert isinstance(data, dict)
domain = data.pop("domain")
servers = data.pop("servers").split()
forest_root_domain = data.pop("forest_root_domain", domain)
account = data.pop("account", None)
password_file = data.pop("password_file", None)
password_folder = data.pop("password_folder", None)
return Realm(
name=name,
domain=domain,
servers=servers,
forest_root_domain=forest_root_domain,
account=account,
password_file=password_file,
password_folder=password_folder,
)
def ldap_uri(self, *, gc: bool, tls: bool, server: typing.Optional[str] = None) -> str:
scheme = "ldaps" if tls else "ldap"
port = (":3269" if tls else ":3268") if gc else "" # default ports unless gc
if not server is None:
if not server in self.servers:
raise SystemExit(f"Server {server!r} not listed for realm {self.name}")
servers = [server]
else:
servers = self.servers
return " ".join(f"{scheme}://{server}.{self.domain}{port}" for server in servers)
def default_base(self, *, gc: bool) -> str:
domain = self.forest_root_domain if gc else self.domain
return ",".join(f"DC={label}" for label in domain.split("."))
@property
def password_name(self) -> str:
"""
Name of password file for the account.
If password_file wasn't set, it is derived from account:
If account is using the "email address" format (userPrincipalName),
the password file is the local part.
Otherwise it is assumed to be a DN and a full path is extracted from it:
CN=Bob,OU=SomeDepartment,DC=example,DC=com
becomes:
example.com/SomeDepartment/Bob
If a password_folder was specified, the file is search within it.
"""
if self.account is None:
raise ValueError("Require account name to lookup password")
if not self.password_file is None:
secretname = self.password_file
elif "@" in self.account:
secretname = self.account.split("@", maxsplit=1)[0]
else:
secretname = DNInfo(dn=self.account).full_path
return os.path.join(self.password_folder or "", secretname)
class PasswordManager(abc.ABC):
@abc.abstractmethod
def get_password(self, password_name: str) -> str:
raise NotImplementedError()
@dataclasses.dataclass
class Keyringer(PasswordManager):
keyring: str
folder: str
@staticmethod
def load(data: typing.Any) -> Keyringer:
assert isinstance(data, dict)
keyring = data.pop("keyring")
folder = data.pop("folder")
return Keyringer(keyring=keyring, folder=folder)
def get_password(self, password_name: str) -> str:
secretname = os.path.join(self.folder, password_name)
result = subprocess.run(
[
"keyringer",
self.keyring,
"decrypt",
secretname,
],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
check=True,
encoding="utf-8",
)
return result.stdout.strip()
@dataclasses.dataclass
class PasswordScript(PasswordManager):
command: list[str]
@staticmethod
def load(data: typing.Any) -> PasswordScript:
if isinstance(data, str):
return PasswordScript(command=shlex.split(data))
elif isinstance(data, list):
for elem in data:
assert isinstance(elem, str)
return PasswordScript(command=data)
raise ValueError("password-script either takes string or list of strings")
def get_password(self, password_name: str) -> str:
result = subprocess.run(
self.command + [password_name],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
check=True,
encoding="utf-8",
)
return result.stdout.strip()
@dataclasses.dataclass
class Config:
password_manager: typing.Optional[PasswordManager] = None
realms: dict[str, Realm] = dataclasses.field(default_factory=dict)
@staticmethod
def load() -> Config:
conf_path = os.path.expanduser("~/.config/ldaptool.yaml")
if not os.path.exists(conf_path):
raise SystemExit(f"Missing config file {conf_path}")
with open(conf_path) as f:
data = yaml.safe_load(f)
assert isinstance(data, dict)
assert "realms" in data
realms_data = data.pop("realms")
assert isinstance(realms_data, dict)
realms = {}
for name, realm_data in realms_data.items():
realms[name] = Realm.load(name, realm_data)
password_manager: typing.Optional[PasswordManager] = None
if "keyringer" in data:
if password_manager:
raise ValueError("Can only set a single password manager")
password_manager = Keyringer.load(data.pop("keyringer"))
if "password-script" in data:
if password_manager:
raise ValueError("Can only set a single password manager")
password_manager = PasswordScript.load(data.pop("password-script"))
return Config(realms=realms, password_manager=password_manager)
def get_password(self, realm: Realm) -> str:
"""
Return password if password manager is configured.
Could support other tools as well here.
"""
if realm.account is None:
raise RuntimeError("Can't get password without acccount - should use kerberos instead")
if self.password_manager:
return self.password_manager.get_password(realm.password_name)
import getpass
return getpass.getpass(f"Enter password for {realm.password_name}: ")