Compare commits

...

23 Commits

Author SHA1 Message Date
e8a23e0ede fix table outputs (join multiple values with separator again), use separate method for (simple) json 2023-05-12 11:16:45 +02:00
125eea5afc decode groupType 2023-05-12 11:07:45 +02:00
a936734cee run ./fmt.sh to fix lint 2023-05-11 17:29:20 +02:00
dd225c8b7a move --json to --full_json; remove --human JSON output, replace with --json, but don't merge multiple values - use list instead 2023-05-11 17:29:00 +02:00
55deb40268 decode securityIdentifier attribute as SID 2023-05-10 19:53:03 +02:00
3b5f698ff5 add argument to postprocess steps and support index/slicing in DN-related hooks; document them 2023-05-10 19:52:44 +02:00
34fcd259ef improve config loading: don't modify dicts to allow yaml repeated nodes 2023-05-10 16:25:41 +02:00
f036713d71 improve some error messages 2023-05-10 16:23:32 +02:00
f1d57487be Catch CTRL+C and CTRL+D in password prompts 2023-05-10 16:15:09 +02:00
04fd42c63b Catch invalid passwords in keepass 2023-05-10 16:01:34 +02:00
1a9829b93b handle missing KeePass entry 2023-05-10 16:00:07 +02:00
21069e892e :Fix version requirement for python3.10 2023-05-02 17:47:11 +02:00
357b1ae9cb use Enum instead of StrEnum for python3.10 2023-05-02 16:32:02 +02:00
cd7cfe451c support attribute post-processing; :<len>, and DN :domain, :path, :fullpath 2023-04-28 20:48:36 +02:00
bc1eb65738 move json output format handling to main tool from decoder 2023-04-28 20:36:52 +02:00
c03374d6df move argument/column handling to decoder (prepare for more post-processing in decoder) 2023-04-28 19:46:35 +02:00
1c5b971d86 README.md: document csvkit dependency 2023-04-28 19:29:59 +02:00
30d8f9f350 add html output format 2023-04-28 19:28:29 +02:00
dbaf301911 refactor table variant handling 2023-04-28 19:28:29 +02:00
c412af3de0 run sort internally, refactor table output into separate method 2023-04-28 19:28:29 +02:00
3a8c61ff59 ldaptool: move output arguments from search to main 2023-04-28 19:28:29 +02:00
a08154cff8 enable tls unless kerberos is used (SASL GSS-API doesn't seem to work over TLS) 2023-04-28 17:20:46 +02:00
46f54cb918 README.md: fix typo 2023-04-28 16:04:18 +02:00
14 changed files with 592 additions and 216 deletions

View File

@ -9,13 +9,42 @@ CLI tool to query LDAP/AD servers
* Integration with password managers * Integration with password managers
* Various output formats * Various output formats
* Classic LDIF * Classic LDIF
* JSON stream (with detailed or simplified attribute values) * JSON stream (with simplified or detailed attribute values)
* CSV * CSV
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts) * Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
* HTML
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl) * Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination * Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
* By default the first 1000 entries are shown, and it errors if there are more results * By default the first 1000 entries are shown, and it errors if there are more results
* Use `-all` to show all results * Use `--all` to show all results
## Virtual attributes
`ldaptool` supports constructing new values from existing attributes by adding a `:<postprocess>` suffix (which can be chained apart from the length limit).
* Some suffixes support an argument as `:<postprocess>[<arg>]`.
* A single integer as postprocess suffix limits the length of the value; it replaces the last character of the output with `…` if it cut something off.
* Multi-valued attributes generate multiple virtual attrites; each value is processed individually. (The values are joined afterwards for table output if needed.)
### DN handling
DNs are decoded into lists of lists of `(name, value)` pairs (the inner list usually contains exactly one entry).
Attributes with a `DC` name are considered part of the "domain", everything else belongs to the "path".
(Usually a DN will start with path segments and end with domain segments.)
The path is read from back to front.
The following postprocess hooks are available:
* `domain`: extracts the domain as DNS FQDN (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com`)
* `path`: extracts the non-domain parts without names and separates them by `/` (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `Dep1/Someone`)
* `fullpath`: uses the `domain` as first segment in a path (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com/Dep1/Someone`)
* `dnslice`: extracts a "slice" from a DN (outer list only); the result is still in DN format.
`path`, `fullpath` and `dnslice` take an optional index/slice as argument, written in python syntax.
For `path` and `fullpath` this extracts only the given index/slice from the path (`fullpath` always includes the full FQDN as first segment), `dnslice` operates on the outer list of decoded (lists of) pairs:
* `dn:dnslice[1:]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `OU=Dep1,DC=example,DC=com`
* `dn:fullpath[:-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `example.com/Dep1`
* `dn:path[-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `Someone`
## Authentication, Protocol, Ports ## Authentication, Protocol, Ports

View File

@ -16,7 +16,7 @@ classifiers = [
] ]
dynamic = ["version", "description"] dynamic = ["version", "description"]
requires-python = "~=3.11" requires-python = "~=3.10"
dependencies = [ dependencies = [
"python-ldap", "python-ldap",
"PyYAML", "PyYAML",

View File

@ -2,24 +2,118 @@ from __future__ import annotations
import argparse import argparse
import csv import csv
import dataclasses
import enum
import html
import subprocess import subprocess
import sys import sys
import typing import typing
from ldaptool import decode, search from ldaptool import decode, search
from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded from ldaptool._utils.ldap import Result, SizeLimitExceeded
class TableOutput(enum.Enum):
MARKDOWN = "markdown"
CSV = "csv"
HTML = "html"
def _html_escape_line(columns: typing.Sequence[str], *, cell: str = "td") -> str:
cell_s = f"<{cell}>"
cell_e = f"</{cell}>"
return "<tr>" + ("".join(cell_s + html.escape(col) + cell_e for col in columns)) + "</tr>\n"
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(search.Arguments):
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
html: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="HTML table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
full_json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Use full json output (dn as str, attributes as list of dicts containing various represenatations)",
),
)
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Use simple json output (dn as str, attributes map to list of human-readable strings)",
),
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
# pick at most one in csv, (markdown) table, html
if [self.csv, self.table, self.html].count(True) > 1:
raise SystemExit("Can't use more than one table output type")
if self.csv:
self.table_output = TableOutput.CSV
elif self.table:
self.table_output = TableOutput.MARKDOWN
elif self.html:
self.table_output = TableOutput.HTML
if self.sort and self.table_output is None:
# default to markdown table
self.table_output = TableOutput.MARKDOWN
if self.table_output:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.full_json:
raise SystemExit("Can't use both table output and --json")
if self.json:
raise SystemExit("Can't use both table output and --human")
if self.raw:
if self.table_output:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.full_json or self.json:
raise SystemExit("Decode options require decode; --raw not allowed")
class _Context: class _Context:
def __init__(self) -> None: def __init__(self) -> None:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
arguments_p = search.Arguments.add_to_parser(parser) arguments_p = Arguments.add_to_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
self.config = search.Config.load() self.config = search.Config.load()
except Exception as e: except Exception as e:
raise SystemExit(f"config error: {e}") raise SystemExit(f"config error: {e!r}")
try:
self.arguments = arguments_p.from_args(args) self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e:
raise SystemExit(f"invalid arguments: {e}")
def run(self) -> None: def run(self) -> None:
# starting the search sets the base we want to print # starting the search sets the base we want to print
@ -40,24 +134,60 @@ class _Context:
output = proc.stdin output = proc.stdin
try: try:
if self.arguments.table: if self.arguments.table_output == TableOutput.MARKDOWN:
add_filter(["csvlook"]) add_filter(["csvlook"])
if self.arguments.sort: if self.arguments.table_output:
add_filter(["csvsort", "--blanks"]) self._table_output(search_iterator, stream=output)
self._run_search(search_iterator, stream=output) else:
self._ldif_or_json_output(search_iterator, stream=output)
finally: finally:
if procs: if procs:
output.close() output.close()
for proc in reversed(procs): for proc in reversed(procs):
proc.wait() proc.wait()
def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None: def _to_table_lines(self, search_iterator: typing.Iterable[Result]) -> typing.Iterable[tuple[str, ...]]:
decoder = decode.Decoder(arguments=self.arguments)
# "human" (json) dicts contain data by lower case key:
column_keys = [col.lower() for col in self.arguments.columns]
try:
for dn, entry in search_iterator:
if dn is None:
continue
# normal entry
assert not isinstance(entry, list)
obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
def _table_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
line_iterator = self._to_table_lines(search_iterator)
if self.arguments.sort:
line_iterator = sorted(line_iterator)
if self.arguments.table_output in [TableOutput.CSV, TableOutput.MARKDOWN]:
csv_out = csv.writer(stream, lineterminator="\n")
csv_out.writerow(self.arguments.columns)
for line in line_iterator:
csv_out.writerow(line)
else:
assert self.arguments.table_output == TableOutput.HTML
stream.write("<table>\n")
stream.write(_html_escape_line(self.arguments.columns, cell="th"))
for line in line_iterator:
stream.write(_html_escape_line(line))
stream.write("</table>\n")
def _ldif_or_json_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
decoder = decode.Decoder(arguments=self.arguments) decoder = decode.Decoder(arguments=self.arguments)
num_responses = 0 num_responses = 0
num_entries = 0 num_entries = 0
ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human) ldif_output = not (self.arguments.full_json or self.arguments.json)
if ldif_output: if ldif_output:
print("# extended LDIF") print("# extended LDIF")
@ -72,22 +202,11 @@ class _Context:
print("#") print("#")
print() print()
if self.arguments.csv:
csv_out = csv.DictWriter(
stream,
fieldnames=self.arguments.columns,
lineterminator="\n",
extrasaction="ignore",
)
csv_out.writeheader()
# dicts contain data by lower case key
csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
try: try:
for dn, entry in search_iterator: for dn, entry in search_iterator:
num_responses += 1 num_responses += 1
if dn is None: if dn is None:
if not self.arguments.csv: if ldif_output:
print("# search reference") print("# search reference")
for ref in entry: for ref in entry:
assert isinstance(ref, str) assert isinstance(ref, str)
@ -97,11 +216,13 @@ class _Context:
# normal entry # normal entry
assert not isinstance(entry, list) assert not isinstance(entry, list)
num_entries += 1 num_entries += 1
obj = decoder.read(dn=dn, entry=entry) if ldif_output:
if self.arguments.csv: decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
csv_out.writerow(decoder.human(dn=dn, entry=obj)) elif self.arguments.json:
decoder.read_and_emit_simple_json(dn=dn, entry=entry, file=stream)
else: else:
decoder.emit(dn=dn, entry=obj) assert self.arguments.full_json
decoder.read_and_emit_full_json(dn=dn, entry=entry, file=stream)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")

View File

@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class BaseArguments: class BaseArguments:
def __post_init__(self) -> None:
pass
@classmethod @classmethod
def add_fields_to_parser( def add_fields_to_parser(
cls: type[_TArgs], cls: type[_TArgs],

View File

@ -33,19 +33,26 @@ class DNInfo:
def domain(self) -> str: def domain(self) -> str:
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc") return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str: def _path(self, *, escape: typing.Callable[[str], str], sep: str, selection: slice = slice(None)) -> str:
return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc") rev_flattened = [ava[1] for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc"]
return sep.join(value for value in rev_flattened[selection])
def sliced_path(self, selection: slice, /) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/", selection=selection)
@functools.cached_property @functools.cached_property
def path(self) -> str: def path(self) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/") return self.sliced_path(slice(None))
@property def sliced_full_path(self, selection: slice, /) -> str:
def full_path(self) -> str:
domain = self.domain domain = self.domain
path = self.path path = self.sliced_path(selection)
if not path: if not path:
return self.domain return self.domain
if not domain: if not domain:
return self.path return self.path
return f"{domain}/{path}" return f"{domain}/{path}"
@property
def full_path(self) -> str:
return self.sliced_full_path(slice(None))

View File

@ -1,10 +1,12 @@
from __future__ import annotations from __future__ import annotations
from ._decoder import Attribute, Decoder from ._decoder import Attribute, Decoder
from ._postprocess import InvalidStep
from .arguments import Arguments from .arguments import Arguments
__all__ = [ __all__ = [
"Arguments", "Arguments",
"Attribute", "Attribute",
"Decoder", "Decoder",
"InvalidStep",
] ]

View File

@ -8,8 +8,6 @@ import sys
import typing import typing
import uuid import uuid
from ldaptool._utils.dninfo import DNInfo
from . import _types from . import _types
from .arguments import Arguments from .arguments import Arguments
@ -102,8 +100,15 @@ class Attribute:
except Exception: except Exception:
return return
def _try_decode_grouptype(self) -> None:
if self.utf8_clean:
try:
self.decoded = _types.grouptype.parse(self.utf8_clean.strip())
except Exception:
return
def _try_decode(self, args: Arguments) -> None: def _try_decode(self, args: Arguments) -> None:
if self.name in ("objectSid",): if self.name in ("objectSid", "securityIdentifier"):
self._try_decode_sid() self._try_decode_sid()
elif self.name in ("msExchMailboxGuid", "objectGUID"): elif self.name in ("msExchMailboxGuid", "objectGUID"):
self._try_decode_uuid() self._try_decode_uuid()
@ -117,21 +122,23 @@ class Attribute:
self._try_decode_timestamp(args) self._try_decode_timestamp(args)
elif self.name == "userAccountControl": elif self.name == "userAccountControl":
self._try_decode_uac() self._try_decode_uac()
elif self.name == "groupType":
self._try_decode_grouptype()
@property @property
def _base64_value(self) -> str: def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii") return base64.b64encode(self.raw).decode("ascii")
def print(self) -> None: def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
if not self.decoded is None: if not self.decoded is None:
comment = self.utf8_clean comment = self.utf8_clean
if comment is None: if comment is None:
comment = self._base64_value comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}") print(f"{self.name}: {self.decoded} # {comment}", file=file)
elif not self.utf8_clean is None: elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}") print(f"{self.name}: {self.utf8_clean}", file=file)
else: else:
print(f"{self.name}:: {self._base64_value}") print(f"{self.name}:: {self._base64_value}", file=file)
def to_json(self) -> dict[str, typing.Any]: def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {} item: dict[str, typing.Any] = {}
@ -175,55 +182,75 @@ class Decoder:
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values] name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items() for name, raw_values in entry.items()
} }
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
dninfo = DNInfo(dn=dn) for attr, post_processes in self.arguments.post_process.items():
if self.arguments.dndomain: if attr == "dn":
decoded_entry["dndomain"] = [ values = [dn]
Attribute.fake_attribute("dndomain", dninfo.domain), else:
] attrs = decoded_entry.get(attr, None)
if self.arguments.dnpath: if attrs is None:
decoded_entry["dnpath"] = [ continue
Attribute.fake_attribute("dnpath", dninfo.path), values = [at.human() for at in attrs]
] for column, post_process in post_processes.items():
if self.arguments.dnfullpath: decoded_entry[column] = [
decoded_entry["dnfullpath"] = [ Attribute.fake_attribute(column, post_process.process(value)) for value in values
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
] ]
return decoded_entry return decoded_entry
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs) emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit return emit
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]: def simple_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn) emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items(): for name, attrs in obj.items():
emit[name] = [attr.human() for attr in attrs]
return emit
def emit_simple_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.simple_json(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_simple_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_simple_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def full_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items():
emit[name] = [attr.to_json() for attr in attrs] emit[name] = [attr.to_json() for attr in attrs]
return emit return emit
def _emit_json(self, *, dn: str, entry: TDecoded) -> None: def emit_full_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
if self.arguments.human: emit = self.full_json(dn=dn, obj=obj)
emit = self.human(dn=dn, entry=entry) json.dump(emit, file, ensure_ascii=False)
else: print(file=file) # terminate output dicts by newline
emit = self.json(dn=dn, entry=entry)
json.dump(emit, sys.stdout, ensure_ascii=False)
print() # terminate output dicts by newline
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None: def read_and_emit_full_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}") self.emit_full_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
for attrs in entry.values():
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file)
attrs: typing.Optional[list[Attribute]]
if not self.arguments.attributes:
# show all attributes - use order from server
for attrs in obj.values():
for attr in attrs: for attr in attrs:
attr.print() attr.print(file=file)
print() # separate entries with newlines
def emit(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human or self.arguments.json:
self._emit_json(dn=dn, entry=entry)
else: else:
self._emit_ldif(dn=dn, entry=entry) # only selected columns; use given order
for column in self.arguments.columns_keys:
if column == "dn":
continue # already printed dn
attrs = obj.get(column, None)
if attrs is None:
continue
for attr in attrs:
attr.print(file=file)
print(file=file) # separate entries with newlines
def handle(self, *, dn: str, entry: TEntry) -> None: def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
entry_attrs = self.read(dn=dn, entry=entry) self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
self.emit(dn=dn, entry=entry_attrs)

View File

@ -0,0 +1,195 @@
from __future__ import annotations
import abc
import dataclasses
import typing
import ldap.dn
from ldaptool._utils.dninfo import DNInfo
class Step(abc.ABC):
__slots__ = ()
@abc.abstractmethod
def step(self, value: str) -> str:
...
def _args_to_slice(args: str) -> slice:
args = args.strip()
if not args:
return slice(None)
params: list[typing.Optional[int]] = []
for arg in args.split(":"):
arg = arg.strip()
if arg:
params.append(int(arg))
else:
params.append(None)
if len(params) == 1:
assert isinstance(params[0], int)
ndx = params[0]
if ndx == -1:
return slice(ndx, None) # from last element to end - still exactly one element
# this doesn't work for ndx == -1: slice(-1, 0) is always empty. otherwise it should return [ndx:][:1].
return slice(ndx, ndx + 1)
return slice(*params)
@dataclasses.dataclass(slots=True)
class MaxLength(Step):
limit: int
def step(self, value: str) -> str:
if not self.limit or len(value) <= self.limit:
return value
return value[: self.limit - 1] + ""
@dataclasses.dataclass(slots=True)
class DNDomain(Step):
def __init__(self, args: str) -> None:
if args:
raise ValueError(":domain doesn't support an argument")
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.domain
@dataclasses.dataclass(slots=True)
class DNPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.sliced_path(self.path_slice)
@dataclasses.dataclass(slots=True)
class DNFullPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.sliced_full_path(self.path_slice)
@dataclasses.dataclass(slots=True)
class DNSlice(Step):
slice: slice
def __init__(self, args: str) -> None:
self.slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return ldap.dn.dn2str(dninfo.parts[self.slice]) # type: ignore
_STEPS: dict[str, typing.Callable[[str], Step]] = {
"domain": DNDomain,
"path": DNPath,
"fullpath": DNFullPath,
"dnslice": DNSlice,
}
class InvalidStep(Exception):
pass
@dataclasses.dataclass(slots=True)
class PostProcess:
steps: list[Step]
def process(self, value: str) -> str:
for step in self.steps:
value = step.step(value)
return value
def parse_steps(steps: str) -> PostProcess:
result: list[Step] = []
cur_id_start = 0
cur_args_start = -1
current_id = ""
current_args = ""
count_brackets = 0
step_done = False
def handle_step() -> None:
nonlocal cur_id_start, cur_args_start, current_id, current_args, step_done
assert step_done
step_i = _STEPS.get(current_id, None)
if step_i is None:
try:
max_len = int(current_id)
result.append(MaxLength(max_len))
except ValueError:
raise InvalidStep(f"Unknown post-processing step {current_id!r}")
else:
result.append(step_i(current_args))
cur_id_start = pos + 1
cur_args_start = -1
current_id = ""
current_args = ""
step_done = False
for pos, char in enumerate(steps):
if step_done:
if char != ":":
raise InvalidStep(f"Require : after step, found {char!r} at pos {pos}")
handle_step()
elif char == "[":
if count_brackets == 0:
# end of identifier
current_id = steps[cur_id_start:pos]
cur_args_start = pos + 1
count_brackets += 1
elif char == "]":
count_brackets -= 1
if count_brackets == 0:
current_args = steps[cur_args_start:pos]
step_done = True
elif count_brackets:
continue
elif not char.isalnum():
raise InvalidStep(f"Expecting either alphanumeric, ':' or '[', got {char!r} at {pos}")
if not step_done:
current_id = steps[cur_id_start:]
if current_id:
step_done = True
if step_done:
handle_step()
return PostProcess(result)

View File

@ -1,8 +1,9 @@
from __future__ import annotations from __future__ import annotations
from . import sid, timestamp, uac from . import grouptype, sid, timestamp, uac
__all__ = [ __all__ = [
"grouptype",
"sid", "sid",
"timestamp", "timestamp",
"uac", "uac",

View File

@ -0,0 +1,29 @@
from __future__ import annotations
import enum
import typing
class GroupTypeFlags(enum.IntFlag):
SYSTEM = 0x00000001
SCOPE_GLOBAL = 0x00000002
SCOPE_DOMAIN = 0x00000004
SCOPE_UNIVERSAL = 0x00000008
APP_BASIC = 0x00000010
APP_QUERY = 0x00000020
SECURITY = 0x80000000 # otherwise distribution
def flags(self) -> list[GroupTypeFlags]:
# ignore "uncovered" bits for now
value = self.value
members = []
for member in GroupTypeFlags:
member_value = member.value
if member_value and member_value & value == member_value:
members.append(member)
return members
def parse(value: str) -> str:
members = GroupTypeFlags(int(value)).flags()
return ", ".join(typing.cast(str, member.name) for member in members)

View File

@ -1,47 +1,78 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
from . import _postprocess
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(argclasses.BaseArguments): class Arguments(argclasses.BaseArguments):
json: bool = dataclasses.field( columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
default=False, columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
metadata=argclasses.arg(help="Use full json output"), attributes: list[str] = dataclasses.field(default_factory=list)
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
human_separator: str = dataclasses.field( human_separator: str = dataclasses.field(
default=", ", default=", ",
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"), metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
) )
dateonly: bool = dataclasses.field( dateonly: bool = dataclasses.field(
default=True, default=True,
metadata=argclasses.arg(help="Use only date part of decoded timestamps"), metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
) )
dndomain: bool = dataclasses.field(
default=False, post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
) def __post_init__(self) -> None:
dnpath: bool = dataclasses.field( super(Arguments, self).__post_init__() # super() not working here, unclear why.
default=False,
metadata=argclasses.arg( # extract special attribute names
help=""" all_attributes = False
Whether to export a virtual dnpath attribute attributes_set: set[str] = set()
('/' joined values of reversed DN without DNS labels) self.columns_keys = []
""" for column in list(self.columns):
), column = column.lower()
)
dnfullpath: bool = dataclasses.field( if column == "*":
default=False, # '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
metadata=argclasses.arg( self.columns.remove("*")
help=""" all_attributes = True
Whether to export a virtual dnfullpath attribute continue
('/' joined values of reversed DN; DNS domain as first label)
""" self.columns_keys.append(column)
),
) if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("domain")
attributes_set.add("dn")
elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("path")
attributes_set.add("dn")
elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("fullpath")
attributes_set.add("dn")
else:
col_parts = column.split(":", maxsplit=1)
attributes_set.add(col_parts[0])
if len(col_parts) == 2:
source, steps = col_parts
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(steps)
if all_attributes:
self.attributes = []
else:
self.attributes = list(attributes_set)

View File

@ -21,7 +21,7 @@ def search(*, config: Config, arguments: Arguments) -> typing.Iterable[Result]:
if not arguments.base: if not arguments.base:
arguments.base = realm.default_base(gc=arguments.gc) arguments.base = realm.default_base(gc=arguments.gc)
ldap_con = ldap.initialize(realm.ldap_uri(gc=arguments.gc, tls=False, server=arguments.server)) ldap_con = ldap.initialize(realm.ldap_uri(gc=arguments.gc, tls=not arguments.krb, server=arguments.server))
ldap_con.set_option(ldap.OPT_REFERRALS, 0) ldap_con.set_option(ldap.OPT_REFERRALS, 0)
if arguments.krb: if arguments.krb:
ldap_con.sasl_gssapi_bind_s() ldap_con.sasl_gssapi_bind_s()

View File

@ -1,6 +1,5 @@
from __future__ import annotations from __future__ import annotations
import argparse
import dataclasses import dataclasses
import typing import typing
@ -8,28 +7,8 @@ import ldaptool.decode.arguments
from ldaptool._utils import argclasses from ldaptool._utils import argclasses
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True) @dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(ldaptool.decode.arguments.Arguments): class Arguments(ldaptool.decode.arguments.Arguments):
# overwrite fields for fake attributes to remove them from argparse;
# we enable those based on the attribute list
dndomain: bool = False
dnpath: bool = False
dnfullpath: bool = False
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns: list[str] = dataclasses.field(default_factory=list)
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter")) filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
find: typing.Optional[str] = dataclasses.field( find: typing.Optional[str] = dataclasses.field(
default=None, default=None,
@ -44,10 +23,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
default=False, default=False,
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"), metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
) )
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in")) realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
server: typing.Optional[str] = dataclasses.field( server: typing.Optional[str] = dataclasses.field(
default=None, default=None,
@ -77,24 +52,10 @@ class Arguments(ldaptool.decode.arguments.Arguments):
help="Explicit search base (defaults to root of domain / forest with --gc)", help="Explicit search base (defaults to root of domain / forest with --gc)",
), ),
) )
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
def __post_init__(self) -> None: def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
if not self.filter is None: if not self.filter is None:
if not self.find is None: if not self.find is None:
raise SystemExit("Can't use both --find and --filter") raise SystemExit("Can't use both --find and --filter")
@ -106,46 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
else: else:
# probably doesn't like empty filter? # probably doesn't like empty filter?
self.filter = "(objectClass=*)" self.filter = "(objectClass=*)"
# can't print both csv and markdown
if self.csv and self.table:
raise SystemExit("Can't use both --table and --csv")
if self.sort:
if not self.table and not self.csv:
# default to markdown table
self.table = True
if self.table:
# markdown requires underlying csv
self.csv = True
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
# create fake attributes on demand
if attributes_set.pop("dndomain", ""):
self.dndomain = True
if attributes_set.pop("dnpath", ""):
self.dnpath = True
if attributes_set.pop("dnfullpath", ""):
self.dnfullpath = True
# store remaining attributes (with original case)
self.attributes = list(attributes_set.values())
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]
if self.csv:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both --table / --csv / --sort and --json")
if self.human:
raise SystemExit("Can't use both --table / --csv / --sort and --human")
if self.raw:
if self.csv:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")

View File

@ -7,6 +7,7 @@ import os
import os.path import os.path
import shlex import shlex
import subprocess import subprocess
import sys
import typing import typing
import yaml import yaml
@ -28,13 +29,13 @@ class Realm:
@staticmethod @staticmethod
def load(name: str, data: typing.Any) -> Realm: def load(name: str, data: typing.Any) -> Realm:
assert isinstance(data, dict) assert isinstance(data, dict), f"Realm section isn't a dictionary: {data!r}"
domain = data.pop("domain") domain = data["domain"]
servers = data.pop("servers").split() servers = data["servers"].split()
forest_root_domain = data.pop("forest_root_domain", domain) forest_root_domain = data.get("forest_root_domain", domain)
account = data.pop("account", None) account = data.get("account", None)
password_file = data.pop("password_file", None) password_file = data.get("password_file", None)
password_folder = data.pop("password_folder", None) password_folder = data.get("password_folder", None)
return Realm( return Realm(
name=name, name=name,
domain=domain, domain=domain,
@ -101,8 +102,8 @@ class Keyringer(PasswordManager):
@staticmethod @staticmethod
def load(data: typing.Any) -> Keyringer: def load(data: typing.Any) -> Keyringer:
assert isinstance(data, dict) assert isinstance(data, dict)
keyring = data.pop("keyring") keyring = data["keyring"]
folder = data.pop("folder") folder = data.get("folder", "")
return Keyringer(keyring=keyring, folder=folder) return Keyringer(keyring=keyring, folder=folder)
def get_password(self, password_name: str) -> str: def get_password(self, password_name: str) -> str:
@ -145,9 +146,17 @@ class Keepass(PasswordManager):
def get_password(self, password_name: str) -> str: def get_password(self, password_name: str) -> str:
import pykeepass # already made sure it is avaiable above import pykeepass # already made sure it is avaiable above
while True:
try:
password = getpass.getpass(f"KeePass password for database {self.database}: ") password = getpass.getpass(f"KeePass password for database {self.database}: ")
kp = pykeepass.PyKeePass(self.database, password=password) kp = pykeepass.PyKeePass(self.database, password=password)
break
except pykeepass.exceptions.CredentialsError:
print("Invalid password", file=sys.stderr)
entry = kp.find_entries(username=password_name, first=True) entry = kp.find_entries(username=password_name, first=True)
if not entry:
raise SystemExit(f"no KeePass entry for {password_name!r} found")
return entry.password # type: ignore return entry.password # type: ignore
@ -190,8 +199,8 @@ class Config:
with open(conf_path) as f: with open(conf_path) as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
assert isinstance(data, dict) assert isinstance(data, dict)
assert "realms" in data assert "realms" in data, "Missing realms section in config"
realms_data = data.pop("realms") realms_data = data["realms"]
assert isinstance(realms_data, dict) assert isinstance(realms_data, dict)
realms = {} realms = {}
for name, realm_data in realms_data.items(): for name, realm_data in realms_data.items():
@ -201,15 +210,15 @@ class Config:
if "keyringer" in data: if "keyringer" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = Keyringer.load(data.pop("keyringer")) password_manager = Keyringer.load(data["keyringer"])
if "keepass" in data: if "keepass" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = Keepass.load(data.pop("keepass")) password_manager = Keepass.load(data["keepass"])
if "password-script" in data: if "password-script" in data:
if password_manager: if password_manager:
raise ValueError("Can only set a single password manager") raise ValueError("Can only set a single password manager")
password_manager = PasswordScript.load(data.pop("password-script")) password_manager = PasswordScript.load(data["password-script"])
return Config(realms=realms, password_manager=password_manager) return Config(realms=realms, password_manager=password_manager)
@ -220,7 +229,11 @@ class Config:
""" """
if realm.account is None: if realm.account is None:
raise RuntimeError("Can't get password without acccount - should use kerberos instead") raise RuntimeError("Can't get password without acccount - should use kerberos instead")
try:
if self.password_manager: if self.password_manager:
return self.password_manager.get_password(realm.password_name) return self.password_manager.get_password(realm.password_name)
return getpass.getpass(f"Enter password for {realm.password_name}: ") return getpass.getpass(f"Enter password for {realm.password_name}: ")
except (KeyboardInterrupt, EOFError):
raise SystemExit("Password prompt / retrieval aborted")