Compare commits

...

16 Commits

Author SHA1 Message Date
e8a23e0ede fix table outputs (join multiple values with separator again), use separate method for (simple) json 2023-05-12 11:16:45 +02:00
125eea5afc decode groupType 2023-05-12 11:07:45 +02:00
a936734cee run ./fmt.sh to fix lint 2023-05-11 17:29:20 +02:00
dd225c8b7a move --json to --full_json; remove --human JSON output, replace with --json, but don't merge multiple values - use list instead 2023-05-11 17:29:00 +02:00
55deb40268 decode securityIdentifier attribute as SID 2023-05-10 19:53:03 +02:00
3b5f698ff5 add argument to postprocess steps and support index/slicing in DN-related hooks; document them 2023-05-10 19:52:44 +02:00
34fcd259ef improve config loading: don't modify dicts to allow yaml repeated nodes 2023-05-10 16:25:41 +02:00
f036713d71 improve some error messages 2023-05-10 16:23:32 +02:00
f1d57487be Catch CTRL+C and CTRL+D in password prompts 2023-05-10 16:15:09 +02:00
04fd42c63b Catch invalid passwords in keepass 2023-05-10 16:01:34 +02:00
1a9829b93b handle missing KeePass entry 2023-05-10 16:00:07 +02:00
21069e892e :Fix version requirement for python3.10 2023-05-02 17:47:11 +02:00
357b1ae9cb use Enum instead of StrEnum for python3.10 2023-05-02 16:32:02 +02:00
cd7cfe451c support attribute post-processing; :<len>, and DN :domain, :path, :fullpath 2023-04-28 20:48:36 +02:00
bc1eb65738 move json output format handling to main tool from decoder 2023-04-28 20:36:52 +02:00
c03374d6df move argument/column handling to decoder (prepare for more post-processing in decoder) 2023-04-28 19:46:35 +02:00
13 changed files with 473 additions and 152 deletions

View File

@ -9,7 +9,7 @@ CLI tool to query LDAP/AD servers
* Integration with password managers
* Various output formats
* Classic LDIF
* JSON stream (with detailed or simplified attribute values)
* JSON stream (with simplified or detailed attribute values)
* CSV
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
* HTML
@ -18,6 +18,34 @@ CLI tool to query LDAP/AD servers
* By default the first 1000 entries are shown, and it errors if there are more results
* Use `--all` to show all results
## Virtual attributes
`ldaptool` supports constructing new values from existing attributes by adding a `:<postprocess>` suffix (which can be chained apart from the length limit).
* Some suffixes support an argument as `:<postprocess>[<arg>]`.
* A single integer as postprocess suffix limits the length of the value; it replaces the last character of the output with `…` if it cut something off.
* Multi-valued attributes generate multiple virtual attrites; each value is processed individually. (The values are joined afterwards for table output if needed.)
### DN handling
DNs are decoded into lists of lists of `(name, value)` pairs (the inner list usually contains exactly one entry).
Attributes with a `DC` name are considered part of the "domain", everything else belongs to the "path".
(Usually a DN will start with path segments and end with domain segments.)
The path is read from back to front.
The following postprocess hooks are available:
* `domain`: extracts the domain as DNS FQDN (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com`)
* `path`: extracts the non-domain parts without names and separates them by `/` (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `Dep1/Someone`)
* `fullpath`: uses the `domain` as first segment in a path (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com/Dep1/Someone`)
* `dnslice`: extracts a "slice" from a DN (outer list only); the result is still in DN format.
`path`, `fullpath` and `dnslice` take an optional index/slice as argument, written in python syntax.
For `path` and `fullpath` this extracts only the given index/slice from the path (`fullpath` always includes the full FQDN as first segment), `dnslice` operates on the outer list of decoded (lists of) pairs:
* `dn:dnslice[1:]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `OU=Dep1,DC=example,DC=com`
* `dn:fullpath[:-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `example.com/Dep1`
* `dn:path[-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `Someone`
## Authentication, Protocol, Ports
`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones.

View File

@ -16,7 +16,7 @@ classifiers = [
]
dynamic = ["version", "description"]
requires-python = "~=3.11"
requires-python = "~=3.10"
dependencies = [
"python-ldap",
"PyYAML",

View File

@ -14,7 +14,7 @@ from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded
class TableOutput(enum.StrEnum):
class TableOutput(enum.Enum):
MARKDOWN = "markdown"
CSV = "csv"
HTML = "html"
@ -42,19 +42,31 @@ class Arguments(search.Arguments):
help="Markdown table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
html: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="HTML table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
full_json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Use full json output (dn as str, attributes as list of dicts containing various represenatations)",
),
)
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Use simple json output (dn as str, attributes map to list of human-readable strings)",
),
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
@ -77,15 +89,15 @@ class Arguments(search.Arguments):
if self.table_output:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
if self.full_json:
raise SystemExit("Can't use both table output and --json")
if self.human:
if self.json:
raise SystemExit("Can't use both table output and --human")
if self.raw:
if self.table_output:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
if self.full_json or self.json:
raise SystemExit("Decode options require decode; --raw not allowed")
@ -97,8 +109,11 @@ class _Context:
try:
self.config = search.Config.load()
except Exception as e:
raise SystemExit(f"config error: {e}")
self.arguments = arguments_p.from_args(args)
raise SystemExit(f"config error: {e!r}")
try:
self.arguments = arguments_p.from_args(args)
except decode.InvalidStep as e:
raise SystemExit(f"invalid arguments: {e}")
def run(self) -> None:
# starting the search sets the base we want to print
@ -141,7 +156,7 @@ class _Context:
continue
# normal entry
assert not isinstance(entry, list)
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
@ -172,7 +187,7 @@ class _Context:
num_responses = 0
num_entries = 0
ldif_output = not (self.arguments.json or self.arguments.human)
ldif_output = not (self.arguments.full_json or self.arguments.json)
if ldif_output:
print("# extended LDIF")
@ -201,8 +216,13 @@ class _Context:
# normal entry
assert not isinstance(entry, list)
num_entries += 1
obj = decoder.read(dn=dn, entry=entry)
decoder.emit(dn=dn, entry=obj)
if ldif_output:
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
elif self.arguments.json:
decoder.read_and_emit_simple_json(dn=dn, entry=entry, file=stream)
else:
assert self.arguments.full_json
decoder.read_and_emit_full_json(dn=dn, entry=entry, file=stream)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")

View File

@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
@dataclasses.dataclass(slots=True, kw_only=True)
class BaseArguments:
def __post_init__(self) -> None:
pass
@classmethod
def add_fields_to_parser(
cls: type[_TArgs],

View File

@ -33,19 +33,26 @@ class DNInfo:
def domain(self) -> str:
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str:
return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc")
def _path(self, *, escape: typing.Callable[[str], str], sep: str, selection: slice = slice(None)) -> str:
rev_flattened = [ava[1] for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc"]
return sep.join(value for value in rev_flattened[selection])
def sliced_path(self, selection: slice, /) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/", selection=selection)
@functools.cached_property
def path(self) -> str:
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/")
return self.sliced_path(slice(None))
@property
def full_path(self) -> str:
def sliced_full_path(self, selection: slice, /) -> str:
domain = self.domain
path = self.path
path = self.sliced_path(selection)
if not path:
return self.domain
if not domain:
return self.path
return f"{domain}/{path}"
@property
def full_path(self) -> str:
return self.sliced_full_path(slice(None))

View File

@ -1,10 +1,12 @@
from __future__ import annotations
from ._decoder import Attribute, Decoder
from ._postprocess import InvalidStep
from .arguments import Arguments
__all__ = [
"Arguments",
"Attribute",
"Decoder",
"InvalidStep",
]

View File

@ -8,8 +8,6 @@ import sys
import typing
import uuid
from ldaptool._utils.dninfo import DNInfo
from . import _types
from .arguments import Arguments
@ -102,8 +100,15 @@ class Attribute:
except Exception:
return
def _try_decode_grouptype(self) -> None:
if self.utf8_clean:
try:
self.decoded = _types.grouptype.parse(self.utf8_clean.strip())
except Exception:
return
def _try_decode(self, args: Arguments) -> None:
if self.name in ("objectSid",):
if self.name in ("objectSid", "securityIdentifier"):
self._try_decode_sid()
elif self.name in ("msExchMailboxGuid", "objectGUID"):
self._try_decode_uuid()
@ -117,21 +122,23 @@ class Attribute:
self._try_decode_timestamp(args)
elif self.name == "userAccountControl":
self._try_decode_uac()
elif self.name == "groupType":
self._try_decode_grouptype()
@property
def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii")
def print(self) -> None:
def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
if not self.decoded is None:
comment = self.utf8_clean
if comment is None:
comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}")
print(f"{self.name}: {self.decoded} # {comment}", file=file)
elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}")
print(f"{self.name}: {self.utf8_clean}", file=file)
else:
print(f"{self.name}:: {self._base64_value}")
print(f"{self.name}:: {self._base64_value}", file=file)
def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {}
@ -175,55 +182,75 @@ class Decoder:
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items()
}
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
dninfo = DNInfo(dn=dn)
if self.arguments.dndomain:
decoded_entry["dndomain"] = [
Attribute.fake_attribute("dndomain", dninfo.domain),
]
if self.arguments.dnpath:
decoded_entry["dnpath"] = [
Attribute.fake_attribute("dnpath", dninfo.path),
]
if self.arguments.dnfullpath:
decoded_entry["dnfullpath"] = [
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
for attr, post_processes in self.arguments.post_process.items():
if attr == "dn":
values = [dn]
else:
attrs = decoded_entry.get(attr, None)
if attrs is None:
continue
values = [at.human() for at in attrs]
for column, post_process in post_processes.items():
decoded_entry[column] = [
Attribute.fake_attribute(column, post_process.process(value)) for value in values
]
return decoded_entry
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items():
for name, attrs in obj.items():
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
return emit
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
def simple_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in entry.items():
for name, attrs in obj.items():
emit[name] = [attr.human() for attr in attrs]
return emit
def emit_simple_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.simple_json(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_simple_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_simple_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def full_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items():
emit[name] = [attr.to_json() for attr in attrs]
return emit
def _emit_json(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human:
emit = self.human(dn=dn, entry=entry)
def emit_full_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.full_json(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_full_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_full_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file)
attrs: typing.Optional[list[Attribute]]
if not self.arguments.attributes:
# show all attributes - use order from server
for attrs in obj.values():
for attr in attrs:
attr.print(file=file)
else:
emit = self.json(dn=dn, entry=entry)
json.dump(emit, sys.stdout, ensure_ascii=False)
print() # terminate output dicts by newline
# only selected columns; use given order
for column in self.arguments.columns_keys:
if column == "dn":
continue # already printed dn
attrs = obj.get(column, None)
if attrs is None:
continue
for attr in attrs:
attr.print(file=file)
print(file=file) # separate entries with newlines
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None:
print(f"dn: {dn}")
for attrs in entry.values():
for attr in attrs:
attr.print()
print() # separate entries with newlines
def emit(self, *, dn: str, entry: TDecoded) -> None:
if self.arguments.human or self.arguments.json:
self._emit_json(dn=dn, entry=entry)
else:
self._emit_ldif(dn=dn, entry=entry)
def handle(self, *, dn: str, entry: TEntry) -> None:
entry_attrs = self.read(dn=dn, entry=entry)
self.emit(dn=dn, entry=entry_attrs)
def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)

View File

@ -0,0 +1,195 @@
from __future__ import annotations
import abc
import dataclasses
import typing
import ldap.dn
from ldaptool._utils.dninfo import DNInfo
class Step(abc.ABC):
__slots__ = ()
@abc.abstractmethod
def step(self, value: str) -> str:
...
def _args_to_slice(args: str) -> slice:
args = args.strip()
if not args:
return slice(None)
params: list[typing.Optional[int]] = []
for arg in args.split(":"):
arg = arg.strip()
if arg:
params.append(int(arg))
else:
params.append(None)
if len(params) == 1:
assert isinstance(params[0], int)
ndx = params[0]
if ndx == -1:
return slice(ndx, None) # from last element to end - still exactly one element
# this doesn't work for ndx == -1: slice(-1, 0) is always empty. otherwise it should return [ndx:][:1].
return slice(ndx, ndx + 1)
return slice(*params)
@dataclasses.dataclass(slots=True)
class MaxLength(Step):
limit: int
def step(self, value: str) -> str:
if not self.limit or len(value) <= self.limit:
return value
return value[: self.limit - 1] + ""
@dataclasses.dataclass(slots=True)
class DNDomain(Step):
def __init__(self, args: str) -> None:
if args:
raise ValueError(":domain doesn't support an argument")
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.domain
@dataclasses.dataclass(slots=True)
class DNPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.sliced_path(self.path_slice)
@dataclasses.dataclass(slots=True)
class DNFullPath(Step):
path_slice: slice
def __init__(self, args: str) -> None:
self.path_slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return dninfo.sliced_full_path(self.path_slice)
@dataclasses.dataclass(slots=True)
class DNSlice(Step):
slice: slice
def __init__(self, args: str) -> None:
self.slice = _args_to_slice(args)
def step(self, value: str) -> str:
try:
dninfo = DNInfo(dn=value)
except Exception:
# not a valid DN -> no processing
return value
return ldap.dn.dn2str(dninfo.parts[self.slice]) # type: ignore
_STEPS: dict[str, typing.Callable[[str], Step]] = {
"domain": DNDomain,
"path": DNPath,
"fullpath": DNFullPath,
"dnslice": DNSlice,
}
class InvalidStep(Exception):
pass
@dataclasses.dataclass(slots=True)
class PostProcess:
steps: list[Step]
def process(self, value: str) -> str:
for step in self.steps:
value = step.step(value)
return value
def parse_steps(steps: str) -> PostProcess:
result: list[Step] = []
cur_id_start = 0
cur_args_start = -1
current_id = ""
current_args = ""
count_brackets = 0
step_done = False
def handle_step() -> None:
nonlocal cur_id_start, cur_args_start, current_id, current_args, step_done
assert step_done
step_i = _STEPS.get(current_id, None)
if step_i is None:
try:
max_len = int(current_id)
result.append(MaxLength(max_len))
except ValueError:
raise InvalidStep(f"Unknown post-processing step {current_id!r}")
else:
result.append(step_i(current_args))
cur_id_start = pos + 1
cur_args_start = -1
current_id = ""
current_args = ""
step_done = False
for pos, char in enumerate(steps):
if step_done:
if char != ":":
raise InvalidStep(f"Require : after step, found {char!r} at pos {pos}")
handle_step()
elif char == "[":
if count_brackets == 0:
# end of identifier
current_id = steps[cur_id_start:pos]
cur_args_start = pos + 1
count_brackets += 1
elif char == "]":
count_brackets -= 1
if count_brackets == 0:
current_args = steps[cur_args_start:pos]
step_done = True
elif count_brackets:
continue
elif not char.isalnum():
raise InvalidStep(f"Expecting either alphanumeric, ':' or '[', got {char!r} at {pos}")
if not step_done:
current_id = steps[cur_id_start:]
if current_id:
step_done = True
if step_done:
handle_step()
return PostProcess(result)

View File

@ -1,8 +1,9 @@
from __future__ import annotations
from . import sid, timestamp, uac
from . import grouptype, sid, timestamp, uac
__all__ = [
"grouptype",
"sid",
"timestamp",
"uac",

View File

@ -0,0 +1,29 @@
from __future__ import annotations
import enum
import typing
class GroupTypeFlags(enum.IntFlag):
SYSTEM = 0x00000001
SCOPE_GLOBAL = 0x00000002
SCOPE_DOMAIN = 0x00000004
SCOPE_UNIVERSAL = 0x00000008
APP_BASIC = 0x00000010
APP_QUERY = 0x00000020
SECURITY = 0x80000000 # otherwise distribution
def flags(self) -> list[GroupTypeFlags]:
# ignore "uncovered" bits for now
value = self.value
members = []
for member in GroupTypeFlags:
member_value = member.value
if member_value and member_value & value == member_value:
members.append(member)
return members
def parse(value: str) -> str:
members = GroupTypeFlags(int(value)).flags()
return ", ".join(typing.cast(str, member.name) for member in members)

View File

@ -1,47 +1,78 @@
from __future__ import annotations
import argparse
import dataclasses
from ldaptool._utils import argclasses
from . import _postprocess
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(argclasses.BaseArguments):
json: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use full json output"),
)
human: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
)
columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
attributes: list[str] = dataclasses.field(default_factory=list)
human_separator: str = dataclasses.field(
default=", ",
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
)
dateonly: bool = dataclasses.field(
default=True,
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
)
dndomain: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
)
dnpath: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="""
Whether to export a virtual dnpath attribute
('/' joined values of reversed DN without DNS labels)
"""
),
)
dnfullpath: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="""
Whether to export a virtual dnfullpath attribute
('/' joined values of reversed DN; DNS domain as first label)
"""
),
)
post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
# extract special attribute names
all_attributes = False
attributes_set: set[str] = set()
self.columns_keys = []
for column in list(self.columns):
column = column.lower()
if column == "*":
# '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
self.columns.remove("*")
all_attributes = True
continue
self.columns_keys.append(column)
if column == "dndomain":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("domain")
attributes_set.add("dn")
elif column == "dnpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("path")
attributes_set.add("dn")
elif column == "dnfullpath":
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("fullpath")
attributes_set.add("dn")
else:
col_parts = column.split(":", maxsplit=1)
attributes_set.add(col_parts[0])
if len(col_parts) == 2:
source, steps = col_parts
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(steps)
if all_attributes:
self.attributes = []
else:
self.attributes = list(attributes_set)

View File

@ -1,6 +1,5 @@
from __future__ import annotations
import argparse
import dataclasses
import typing
@ -8,28 +7,8 @@ import ldaptool.decode.arguments
from ldaptool._utils import argclasses
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
parser.add_argument(
metavar="attributes",
dest=dest,
nargs="*",
help="""
Attributes to lookup (and columns to display in tables).
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
""",
)
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(ldaptool.decode.arguments.Arguments):
# overwrite fields for fake attributes to remove them from argparse;
# we enable those based on the attribute list
dndomain: bool = False
dnpath: bool = False
dnfullpath: bool = False
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
columns: list[str] = dataclasses.field(default_factory=list)
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
find: typing.Optional[str] = dataclasses.field(
default=None,
@ -75,6 +54,8 @@ class Arguments(ldaptool.decode.arguments.Arguments):
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
if not self.filter is None:
if not self.find is None:
raise SystemExit("Can't use both --find and --filter")
@ -86,19 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
else:
# probably doesn't like empty filter?
self.filter = "(objectClass=*)"
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
# create fake attributes on demand
if attributes_set.pop("dndomain", ""):
self.dndomain = True
if attributes_set.pop("dnpath", ""):
self.dnpath = True
if attributes_set.pop("dnfullpath", ""):
self.dnfullpath = True
# store remaining attributes (with original case)
self.attributes = list(attributes_set.values())
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]

View File

@ -7,6 +7,7 @@ import os
import os.path
import shlex
import subprocess
import sys
import typing
import yaml
@ -28,13 +29,13 @@ class Realm:
@staticmethod
def load(name: str, data: typing.Any) -> Realm:
assert isinstance(data, dict)
domain = data.pop("domain")
servers = data.pop("servers").split()
forest_root_domain = data.pop("forest_root_domain", domain)
account = data.pop("account", None)
password_file = data.pop("password_file", None)
password_folder = data.pop("password_folder", None)
assert isinstance(data, dict), f"Realm section isn't a dictionary: {data!r}"
domain = data["domain"]
servers = data["servers"].split()
forest_root_domain = data.get("forest_root_domain", domain)
account = data.get("account", None)
password_file = data.get("password_file", None)
password_folder = data.get("password_folder", None)
return Realm(
name=name,
domain=domain,
@ -101,8 +102,8 @@ class Keyringer(PasswordManager):
@staticmethod
def load(data: typing.Any) -> Keyringer:
assert isinstance(data, dict)
keyring = data.pop("keyring")
folder = data.pop("folder")
keyring = data["keyring"]
folder = data.get("folder", "")
return Keyringer(keyring=keyring, folder=folder)
def get_password(self, password_name: str) -> str:
@ -145,9 +146,17 @@ class Keepass(PasswordManager):
def get_password(self, password_name: str) -> str:
import pykeepass # already made sure it is avaiable above
password = getpass.getpass(f"KeePass password for database {self.database}: ")
kp = pykeepass.PyKeePass(self.database, password=password)
while True:
try:
password = getpass.getpass(f"KeePass password for database {self.database}: ")
kp = pykeepass.PyKeePass(self.database, password=password)
break
except pykeepass.exceptions.CredentialsError:
print("Invalid password", file=sys.stderr)
entry = kp.find_entries(username=password_name, first=True)
if not entry:
raise SystemExit(f"no KeePass entry for {password_name!r} found")
return entry.password # type: ignore
@ -190,8 +199,8 @@ class Config:
with open(conf_path) as f:
data = yaml.safe_load(f)
assert isinstance(data, dict)
assert "realms" in data
realms_data = data.pop("realms")
assert "realms" in data, "Missing realms section in config"
realms_data = data["realms"]
assert isinstance(realms_data, dict)
realms = {}
for name, realm_data in realms_data.items():
@ -201,15 +210,15 @@ class Config:
if "keyringer" in data:
if password_manager:
raise ValueError("Can only set a single password manager")
password_manager = Keyringer.load(data.pop("keyringer"))
password_manager = Keyringer.load(data["keyringer"])
if "keepass" in data:
if password_manager:
raise ValueError("Can only set a single password manager")
password_manager = Keepass.load(data.pop("keepass"))
password_manager = Keepass.load(data["keepass"])
if "password-script" in data:
if password_manager:
raise ValueError("Can only set a single password manager")
password_manager = PasswordScript.load(data.pop("password-script"))
password_manager = PasswordScript.load(data["password-script"])
return Config(realms=realms, password_manager=password_manager)
@ -220,7 +229,11 @@ class Config:
"""
if realm.account is None:
raise RuntimeError("Can't get password without acccount - should use kerberos instead")
if self.password_manager:
return self.password_manager.get_password(realm.password_name)
return getpass.getpass(f"Enter password for {realm.password_name}: ")
try:
if self.password_manager:
return self.password_manager.get_password(realm.password_name)
return getpass.getpass(f"Enter password for {realm.password_name}: ")
except (KeyboardInterrupt, EOFError):
raise SystemExit("Password prompt / retrieval aborted")