Compare commits
8 Commits
ldaptool-0
...
debian/0.4
Author | SHA1 | Date | |
---|---|---|---|
18a27b195e | |||
6856c452e1 | |||
d9803c226e | |||
cbcdb36579 | |||
54a23e8060 | |||
d51d714352 | |||
474ee9383f | |||
71ab3043f4 |
30
README.md
30
README.md
@ -9,7 +9,7 @@ CLI tool to query LDAP/AD servers
|
||||
* Integration with password managers
|
||||
* Various output formats
|
||||
* Classic LDIF
|
||||
* JSON stream (with simplified or detailed attribute values)
|
||||
* JSON stream (with detailed or simplified attribute values)
|
||||
* CSV
|
||||
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
|
||||
* HTML
|
||||
@ -18,34 +18,6 @@ CLI tool to query LDAP/AD servers
|
||||
* By default the first 1000 entries are shown, and it errors if there are more results
|
||||
* Use `--all` to show all results
|
||||
|
||||
## Virtual attributes
|
||||
|
||||
`ldaptool` supports constructing new values from existing attributes by adding a `:<postprocess>` suffix (which can be chained apart from the length limit).
|
||||
|
||||
* Some suffixes support an argument as `:<postprocess>[<arg>]`.
|
||||
* A single integer as postprocess suffix limits the length of the value; it replaces the last character of the output with `…` if it cut something off.
|
||||
* Multi-valued attributes generate multiple virtual attrites; each value is processed individually. (The values are joined afterwards for table output if needed.)
|
||||
|
||||
### DN handling
|
||||
|
||||
DNs are decoded into lists of lists of `(name, value)` pairs (the inner list usually contains exactly one entry).
|
||||
Attributes with a `DC` name are considered part of the "domain", everything else belongs to the "path".
|
||||
(Usually a DN will start with path segments and end with domain segments.)
|
||||
The path is read from back to front.
|
||||
|
||||
The following postprocess hooks are available:
|
||||
* `domain`: extracts the domain as DNS FQDN (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com`)
|
||||
* `path`: extracts the non-domain parts without names and separates them by `/` (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `Dep1/Someone`)
|
||||
* `fullpath`: uses the `domain` as first segment in a path (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com/Dep1/Someone`)
|
||||
* `dnslice`: extracts a "slice" from a DN (outer list only); the result is still in DN format.
|
||||
|
||||
`path`, `fullpath` and `dnslice` take an optional index/slice as argument, written in python syntax.
|
||||
For `path` and `fullpath` this extracts only the given index/slice from the path (`fullpath` always includes the full FQDN as first segment), `dnslice` operates on the outer list of decoded (lists of) pairs:
|
||||
|
||||
* `dn:dnslice[1:]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `OU=Dep1,DC=example,DC=com`
|
||||
* `dn:fullpath[:-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `example.com/Dep1`
|
||||
* `dn:path[-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `Someone`
|
||||
|
||||
## Authentication, Protocol, Ports
|
||||
|
||||
`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones.
|
||||
|
38
debian/changelog
vendored
Normal file
38
debian/changelog
vendored
Normal file
@ -0,0 +1,38 @@
|
||||
ldaptool (0.4-1) unstable; urgency=medium
|
||||
|
||||
* move argument/column handling to decoder (prepare for more post-processing in decoder)
|
||||
* move json output format handling to main tool from decoder
|
||||
* support attribute post-processing; :<len>, and DN :domain, :path, :fullpath
|
||||
* use Enum instead of StrEnum for python3.10
|
||||
|
||||
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Tue, 02 May 2023 16:54:00 +0200
|
||||
|
||||
ldaptool (0.3-1) unstable; urgency=medium
|
||||
|
||||
* ldaptool: move output arguments from search to main
|
||||
* run sort internally, refactor table output into separate method
|
||||
* refactor table variant handling
|
||||
* add html output format
|
||||
* README.md: document csvkit dependency
|
||||
* debian: require csvkit (markdown table is an essential feature)
|
||||
|
||||
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 19:31:37 +0200
|
||||
|
||||
ldaptool (0.2-1) unstable; urgency=medium
|
||||
|
||||
* README.md: fix typo
|
||||
* enable tls unless kerberos is used (SASL GSS-API doesn't seem to work over TLS)
|
||||
|
||||
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 17:21:35 +0200
|
||||
|
||||
ldaptool (0.1-1) unstable; urgency=medium
|
||||
|
||||
* Initial release.
|
||||
|
||||
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:30 +0200
|
||||
|
||||
ldaptool (0.1-0) unstable; urgency=medium
|
||||
|
||||
* Stub ITP lintian.
|
||||
|
||||
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:29 +0200
|
43
debian/control
vendored
Normal file
43
debian/control
vendored
Normal file
@ -0,0 +1,43 @@
|
||||
Source: ldaptool
|
||||
Section: net
|
||||
Priority: optional
|
||||
Maintainer: Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
|
||||
Rules-Requires-Root: no
|
||||
Build-Depends:
|
||||
debhelper-compat (= 13),
|
||||
pybuild-plugin-pyproject,
|
||||
flit,
|
||||
dh-sequence-python3,
|
||||
python3,
|
||||
python3-ldap,
|
||||
python3-yaml,
|
||||
python3-pykeepass,
|
||||
#Testsuite: autopkgtest-pkg-python
|
||||
Standards-Version: 4.6.2
|
||||
Homepage: https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool
|
||||
|
||||
Package: python3-ldaptool
|
||||
Architecture: all
|
||||
Depends:
|
||||
${python3:Depends},
|
||||
${misc:Depends},
|
||||
Recommends:
|
||||
python3-pykeepass,
|
||||
Description: CLI tool to run ldap queries
|
||||
CLI tool to query LDAP/AD servers, featuring various output formats
|
||||
and a configuration for different realms.
|
||||
.
|
||||
This package installs the library for Python 3.
|
||||
|
||||
Package: ldaptool
|
||||
Architecture: all
|
||||
Depends:
|
||||
python3-ldaptool (=${binary:Version}),
|
||||
${python3:Depends},
|
||||
${misc:Depends},
|
||||
csvkit,
|
||||
Description: CLI tool to run ldap queries
|
||||
CLI tool to query LDAP/AD servers, featuring various output formats
|
||||
and a configuration for different realms.
|
||||
.
|
||||
This package installs the script.
|
27
debian/copyright
vendored
Normal file
27
debian/copyright
vendored
Normal file
@ -0,0 +1,27 @@
|
||||
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
|
||||
Source: <https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool>
|
||||
Upstream-Name: ldaptool
|
||||
|
||||
Files:
|
||||
*
|
||||
Copyright:
|
||||
2023 Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
|
||||
2023 Daniel Dizdarevic <daniel.dizdarevic@tik.uni-stuttgart.de>
|
||||
License: MIT
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
.
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
.
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
5
debian/gbp.conf
vendored
Normal file
5
debian/gbp.conf
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
[DEFAULT]
|
||||
pristine-tar = False
|
||||
upstream-branch = main
|
||||
debian-branch = debian
|
||||
upstream-tag = ldaptool-%(version)s
|
13
debian/rules
vendored
Executable file
13
debian/rules
vendored
Executable file
@ -0,0 +1,13 @@
|
||||
#!/usr/bin/make -f
|
||||
|
||||
export PYBUILD_NAME=ldaptool
|
||||
|
||||
%:
|
||||
dh $@ --buildsystem=pybuild
|
||||
|
||||
# we want /usr/bin/ldaptool in a separate package
|
||||
override_dh_auto_install:
|
||||
dh_auto_install
|
||||
|
||||
mkdir -p debian/ldaptool/usr
|
||||
mv debian/python3-ldaptool/usr/bin debian/ldaptool/usr/
|
1
debian/source/format
vendored
Normal file
1
debian/source/format
vendored
Normal file
@ -0,0 +1 @@
|
||||
3.0 (quilt)
|
1
debian/source/options
vendored
Normal file
1
debian/source/options
vendored
Normal file
@ -0,0 +1 @@
|
||||
extend-diff-ignore = "^[^/]*[.]egg-info/|^[.]vscode|/__pycache__/|^venv/|^.mypy_cache/"
|
@ -16,7 +16,7 @@ classifiers = [
|
||||
]
|
||||
dynamic = ["version", "description"]
|
||||
|
||||
requires-python = "~=3.10"
|
||||
requires-python = "~=3.11"
|
||||
dependencies = [
|
||||
"python-ldap",
|
||||
"PyYAML",
|
||||
|
@ -55,17 +55,13 @@ class Arguments(search.Arguments):
|
||||
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
||||
),
|
||||
)
|
||||
full_json: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(
|
||||
help="Use full json output (dn as str, attributes as list of dicts containing various represenatations)",
|
||||
),
|
||||
)
|
||||
json: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(
|
||||
help="Use simple json output (dn as str, attributes map to list of human-readable strings)",
|
||||
),
|
||||
metadata=argclasses.arg(help="Use full json output"),
|
||||
)
|
||||
human: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
|
||||
)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
@ -89,15 +85,15 @@ class Arguments(search.Arguments):
|
||||
if self.table_output:
|
||||
if not self.columns:
|
||||
raise SystemExit("Table output requires attributes")
|
||||
if self.full_json:
|
||||
raise SystemExit("Can't use both table output and --json")
|
||||
if self.json:
|
||||
raise SystemExit("Can't use both table output and --json")
|
||||
if self.human:
|
||||
raise SystemExit("Can't use both table output and --human")
|
||||
|
||||
if self.raw:
|
||||
if self.table_output:
|
||||
raise SystemExit("Table output requires decode; --raw not allowed")
|
||||
if self.full_json or self.json:
|
||||
if self.json or self.human:
|
||||
raise SystemExit("Decode options require decode; --raw not allowed")
|
||||
|
||||
|
||||
@ -109,7 +105,7 @@ class _Context:
|
||||
try:
|
||||
self.config = search.Config.load()
|
||||
except Exception as e:
|
||||
raise SystemExit(f"config error: {e!r}")
|
||||
raise SystemExit(f"config error: {e}")
|
||||
try:
|
||||
self.arguments = arguments_p.from_args(args)
|
||||
except decode.InvalidStep as e:
|
||||
@ -187,7 +183,7 @@ class _Context:
|
||||
num_responses = 0
|
||||
num_entries = 0
|
||||
|
||||
ldif_output = not (self.arguments.full_json or self.arguments.json)
|
||||
ldif_output = not (self.arguments.json or self.arguments.human)
|
||||
|
||||
if ldif_output:
|
||||
print("# extended LDIF")
|
||||
@ -218,11 +214,11 @@ class _Context:
|
||||
num_entries += 1
|
||||
if ldif_output:
|
||||
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
|
||||
elif self.arguments.json:
|
||||
decoder.read_and_emit_simple_json(dn=dn, entry=entry, file=stream)
|
||||
elif self.arguments.human:
|
||||
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
|
||||
else:
|
||||
assert self.arguments.full_json
|
||||
decoder.read_and_emit_full_json(dn=dn, entry=entry, file=stream)
|
||||
assert self.arguments.json
|
||||
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
|
||||
except SizeLimitExceeded as e:
|
||||
raise SystemExit(f"Error: {e}")
|
||||
|
||||
|
@ -33,26 +33,19 @@ class DNInfo:
|
||||
def domain(self) -> str:
|
||||
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
|
||||
|
||||
def _path(self, *, escape: typing.Callable[[str], str], sep: str, selection: slice = slice(None)) -> str:
|
||||
rev_flattened = [ava[1] for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc"]
|
||||
return sep.join(value for value in rev_flattened[selection])
|
||||
|
||||
def sliced_path(self, selection: slice, /) -> str:
|
||||
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/", selection=selection)
|
||||
def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str:
|
||||
return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc")
|
||||
|
||||
@functools.cached_property
|
||||
def path(self) -> str:
|
||||
return self.sliced_path(slice(None))
|
||||
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/")
|
||||
|
||||
def sliced_full_path(self, selection: slice, /) -> str:
|
||||
@property
|
||||
def full_path(self) -> str:
|
||||
domain = self.domain
|
||||
path = self.sliced_path(selection)
|
||||
path = self.path
|
||||
if not path:
|
||||
return self.domain
|
||||
if not domain:
|
||||
return self.path
|
||||
return f"{domain}/{path}"
|
||||
|
||||
@property
|
||||
def full_path(self) -> str:
|
||||
return self.sliced_full_path(slice(None))
|
||||
|
@ -101,7 +101,7 @@ class Attribute:
|
||||
return
|
||||
|
||||
def _try_decode(self, args: Arguments) -> None:
|
||||
if self.name in ("objectSid", "securityIdentifier"):
|
||||
if self.name in ("objectSid",):
|
||||
self._try_decode_sid()
|
||||
elif self.name in ("msExchMailboxGuid", "objectGUID"):
|
||||
self._try_decode_uuid()
|
||||
@ -192,30 +192,30 @@ class Decoder:
|
||||
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||
for name, attrs in obj.items():
|
||||
emit[name] = [attr.human() for attr in attrs]
|
||||
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
||||
return emit
|
||||
|
||||
def emit_simple_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||
def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||
emit = self.human(dn=dn, obj=obj)
|
||||
json.dump(emit, file, ensure_ascii=False)
|
||||
print(file=file) # terminate output dicts by newline
|
||||
|
||||
def read_and_emit_simple_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||
self.emit_simple_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||
|
||||
def full_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||
for name, attrs in obj.items():
|
||||
emit[name] = [attr.to_json() for attr in attrs]
|
||||
return emit
|
||||
|
||||
def emit_full_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||
emit = self.full_json(dn=dn, obj=obj)
|
||||
def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||
emit = self.json(dn=dn, obj=obj)
|
||||
json.dump(emit, file, ensure_ascii=False)
|
||||
print(file=file) # terminate output dicts by newline
|
||||
|
||||
def read_and_emit_full_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||
self.emit_full_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||
def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||
self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||
|
||||
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||
print(f"dn: {dn}", file=file)
|
||||
|
@ -2,9 +2,6 @@ from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import dataclasses
|
||||
import typing
|
||||
|
||||
import ldap.dn
|
||||
|
||||
from ldaptool._utils.dninfo import DNInfo
|
||||
|
||||
@ -17,27 +14,6 @@ class Step(abc.ABC):
|
||||
...
|
||||
|
||||
|
||||
def _args_to_slice(args: str) -> slice:
|
||||
args = args.strip()
|
||||
if not args:
|
||||
return slice(None)
|
||||
params: list[typing.Optional[int]] = []
|
||||
for arg in args.split(":"):
|
||||
arg = arg.strip()
|
||||
if arg:
|
||||
params.append(int(arg))
|
||||
else:
|
||||
params.append(None)
|
||||
if len(params) == 1:
|
||||
assert isinstance(params[0], int)
|
||||
ndx = params[0]
|
||||
if ndx == -1:
|
||||
return slice(ndx, None) # from last element to end - still exactly one element
|
||||
# this doesn't work for ndx == -1: slice(-1, 0) is always empty. otherwise it should return [ndx:][:1].
|
||||
return slice(ndx, ndx + 1)
|
||||
return slice(*params)
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class MaxLength(Step):
|
||||
limit: int
|
||||
@ -50,10 +26,6 @@ class MaxLength(Step):
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class DNDomain(Step):
|
||||
def __init__(self, args: str) -> None:
|
||||
if args:
|
||||
raise ValueError(":domain doesn't support an argument")
|
||||
|
||||
def step(self, value: str) -> str:
|
||||
try:
|
||||
dninfo = DNInfo(dn=value)
|
||||
@ -65,57 +37,30 @@ class DNDomain(Step):
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class DNPath(Step):
|
||||
path_slice: slice
|
||||
|
||||
def __init__(self, args: str) -> None:
|
||||
self.path_slice = _args_to_slice(args)
|
||||
|
||||
def step(self, value: str) -> str:
|
||||
try:
|
||||
dninfo = DNInfo(dn=value)
|
||||
except Exception:
|
||||
# not a valid DN -> no processing
|
||||
return value
|
||||
return dninfo.sliced_path(self.path_slice)
|
||||
return dninfo.path
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class DNFullPath(Step):
|
||||
path_slice: slice
|
||||
|
||||
def __init__(self, args: str) -> None:
|
||||
self.path_slice = _args_to_slice(args)
|
||||
|
||||
def step(self, value: str) -> str:
|
||||
try:
|
||||
dninfo = DNInfo(dn=value)
|
||||
except Exception:
|
||||
# not a valid DN -> no processing
|
||||
return value
|
||||
return dninfo.sliced_full_path(self.path_slice)
|
||||
return dninfo.full_path
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True)
|
||||
class DNSlice(Step):
|
||||
slice: slice
|
||||
|
||||
def __init__(self, args: str) -> None:
|
||||
self.slice = _args_to_slice(args)
|
||||
|
||||
def step(self, value: str) -> str:
|
||||
try:
|
||||
dninfo = DNInfo(dn=value)
|
||||
except Exception:
|
||||
# not a valid DN -> no processing
|
||||
return value
|
||||
return ldap.dn.dn2str(dninfo.parts[self.slice]) # type: ignore
|
||||
|
||||
|
||||
_STEPS: dict[str, typing.Callable[[str], Step]] = {
|
||||
"domain": DNDomain,
|
||||
"path": DNPath,
|
||||
"fullpath": DNFullPath,
|
||||
"dnslice": DNSlice,
|
||||
_STEPS = {
|
||||
"domain": DNDomain(),
|
||||
"path": DNPath(),
|
||||
"fullpath": DNFullPath(),
|
||||
}
|
||||
|
||||
|
||||
@ -133,63 +78,19 @@ class PostProcess:
|
||||
return value
|
||||
|
||||
|
||||
def parse_steps(steps: str) -> PostProcess:
|
||||
result: list[Step] = []
|
||||
|
||||
cur_id_start = 0
|
||||
cur_args_start = -1
|
||||
current_id = ""
|
||||
current_args = ""
|
||||
count_brackets = 0
|
||||
step_done = False
|
||||
|
||||
def handle_step() -> None:
|
||||
nonlocal cur_id_start, cur_args_start, current_id, current_args, step_done
|
||||
assert step_done
|
||||
|
||||
step_i = _STEPS.get(current_id, None)
|
||||
def parse_steps(steps: list[str]) -> PostProcess:
|
||||
max_len = 0
|
||||
try:
|
||||
max_len = int(steps[-1])
|
||||
steps.pop()
|
||||
except ValueError:
|
||||
pass
|
||||
result = []
|
||||
for step in steps:
|
||||
step_i = _STEPS.get(step, None)
|
||||
if step_i is None:
|
||||
try:
|
||||
max_len = int(current_id)
|
||||
result.append(MaxLength(max_len))
|
||||
except ValueError:
|
||||
raise InvalidStep(f"Unknown post-processing step {current_id!r}")
|
||||
else:
|
||||
result.append(step_i(current_args))
|
||||
|
||||
cur_id_start = pos + 1
|
||||
cur_args_start = -1
|
||||
current_id = ""
|
||||
current_args = ""
|
||||
step_done = False
|
||||
|
||||
for pos, char in enumerate(steps):
|
||||
if step_done:
|
||||
if char != ":":
|
||||
raise InvalidStep(f"Require : after step, found {char!r} at pos {pos}")
|
||||
handle_step()
|
||||
elif char == "[":
|
||||
if count_brackets == 0:
|
||||
# end of identifier
|
||||
current_id = steps[cur_id_start:pos]
|
||||
cur_args_start = pos + 1
|
||||
count_brackets += 1
|
||||
elif char == "]":
|
||||
count_brackets -= 1
|
||||
if count_brackets == 0:
|
||||
current_args = steps[cur_args_start:pos]
|
||||
step_done = True
|
||||
elif count_brackets:
|
||||
continue
|
||||
elif not char.isalnum():
|
||||
raise InvalidStep(f"Expecting either alphanumeric, ':' or '[', got {char!r} at {pos}")
|
||||
|
||||
if not step_done:
|
||||
current_id = steps[cur_id_start:]
|
||||
if current_id:
|
||||
step_done = True
|
||||
|
||||
if step_done:
|
||||
handle_step()
|
||||
|
||||
raise InvalidStep(f"Unknown post-processing step {step!r}")
|
||||
result.append(step_i)
|
||||
if max_len:
|
||||
result.append(MaxLength(max_len))
|
||||
return PostProcess(result)
|
||||
|
@ -57,20 +57,20 @@ class Arguments(argclasses.BaseArguments):
|
||||
self.columns_keys.append(column)
|
||||
|
||||
if column == "dndomain":
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("domain")
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"])
|
||||
attributes_set.add("dn")
|
||||
elif column == "dnpath":
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("path")
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"])
|
||||
attributes_set.add("dn")
|
||||
elif column == "dnfullpath":
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("fullpath")
|
||||
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"])
|
||||
attributes_set.add("dn")
|
||||
else:
|
||||
col_parts = column.split(":", maxsplit=1)
|
||||
attributes_set.add(col_parts[0])
|
||||
if len(col_parts) == 2:
|
||||
source, steps = col_parts
|
||||
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(steps)
|
||||
step_names = column.split(":")
|
||||
attributes_set.add(step_names[0])
|
||||
if len(step_names) > 1:
|
||||
source = step_names.pop(0)
|
||||
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names)
|
||||
|
||||
if all_attributes:
|
||||
self.attributes = []
|
||||
|
@ -7,7 +7,6 @@ import os
|
||||
import os.path
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import typing
|
||||
|
||||
import yaml
|
||||
@ -29,13 +28,13 @@ class Realm:
|
||||
|
||||
@staticmethod
|
||||
def load(name: str, data: typing.Any) -> Realm:
|
||||
assert isinstance(data, dict), f"Realm section isn't a dictionary: {data!r}"
|
||||
domain = data["domain"]
|
||||
servers = data["servers"].split()
|
||||
forest_root_domain = data.get("forest_root_domain", domain)
|
||||
account = data.get("account", None)
|
||||
password_file = data.get("password_file", None)
|
||||
password_folder = data.get("password_folder", None)
|
||||
assert isinstance(data, dict)
|
||||
domain = data.pop("domain")
|
||||
servers = data.pop("servers").split()
|
||||
forest_root_domain = data.pop("forest_root_domain", domain)
|
||||
account = data.pop("account", None)
|
||||
password_file = data.pop("password_file", None)
|
||||
password_folder = data.pop("password_folder", None)
|
||||
return Realm(
|
||||
name=name,
|
||||
domain=domain,
|
||||
@ -102,8 +101,8 @@ class Keyringer(PasswordManager):
|
||||
@staticmethod
|
||||
def load(data: typing.Any) -> Keyringer:
|
||||
assert isinstance(data, dict)
|
||||
keyring = data["keyring"]
|
||||
folder = data.get("folder", "")
|
||||
keyring = data.pop("keyring")
|
||||
folder = data.pop("folder")
|
||||
return Keyringer(keyring=keyring, folder=folder)
|
||||
|
||||
def get_password(self, password_name: str) -> str:
|
||||
@ -146,17 +145,9 @@ class Keepass(PasswordManager):
|
||||
def get_password(self, password_name: str) -> str:
|
||||
import pykeepass # already made sure it is avaiable above
|
||||
|
||||
while True:
|
||||
try:
|
||||
password = getpass.getpass(f"KeePass password for database {self.database}: ")
|
||||
kp = pykeepass.PyKeePass(self.database, password=password)
|
||||
break
|
||||
except pykeepass.exceptions.CredentialsError:
|
||||
print("Invalid password", file=sys.stderr)
|
||||
|
||||
password = getpass.getpass(f"KeePass password for database {self.database}: ")
|
||||
kp = pykeepass.PyKeePass(self.database, password=password)
|
||||
entry = kp.find_entries(username=password_name, first=True)
|
||||
if not entry:
|
||||
raise SystemExit(f"no KeePass entry for {password_name!r} found")
|
||||
return entry.password # type: ignore
|
||||
|
||||
|
||||
@ -199,8 +190,8 @@ class Config:
|
||||
with open(conf_path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
assert isinstance(data, dict)
|
||||
assert "realms" in data, "Missing realms section in config"
|
||||
realms_data = data["realms"]
|
||||
assert "realms" in data
|
||||
realms_data = data.pop("realms")
|
||||
assert isinstance(realms_data, dict)
|
||||
realms = {}
|
||||
for name, realm_data in realms_data.items():
|
||||
@ -210,15 +201,15 @@ class Config:
|
||||
if "keyringer" in data:
|
||||
if password_manager:
|
||||
raise ValueError("Can only set a single password manager")
|
||||
password_manager = Keyringer.load(data["keyringer"])
|
||||
password_manager = Keyringer.load(data.pop("keyringer"))
|
||||
if "keepass" in data:
|
||||
if password_manager:
|
||||
raise ValueError("Can only set a single password manager")
|
||||
password_manager = Keepass.load(data["keepass"])
|
||||
password_manager = Keepass.load(data.pop("keepass"))
|
||||
if "password-script" in data:
|
||||
if password_manager:
|
||||
raise ValueError("Can only set a single password manager")
|
||||
password_manager = PasswordScript.load(data["password-script"])
|
||||
password_manager = PasswordScript.load(data.pop("password-script"))
|
||||
|
||||
return Config(realms=realms, password_manager=password_manager)
|
||||
|
||||
@ -229,11 +220,7 @@ class Config:
|
||||
"""
|
||||
if realm.account is None:
|
||||
raise RuntimeError("Can't get password without acccount - should use kerberos instead")
|
||||
if self.password_manager:
|
||||
return self.password_manager.get_password(realm.password_name)
|
||||
|
||||
try:
|
||||
if self.password_manager:
|
||||
return self.password_manager.get_password(realm.password_name)
|
||||
|
||||
return getpass.getpass(f"Enter password for {realm.password_name}: ")
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
raise SystemExit("Password prompt / retrieval aborted")
|
||||
return getpass.getpass(f"Enter password for {realm.password_name}: ")
|
||||
|
Reference in New Issue
Block a user