Compare commits
31 Commits
ldaptool-0
...
7979fa7cc7
Author | SHA1 | Date | |
---|---|---|---|
7979fa7cc7 | |||
ba6cc242f1 | |||
a936734cee | |||
dd225c8b7a | |||
ca0aa23c27 | |||
8928973ee7 | |||
55deb40268 | |||
3b5f698ff5 | |||
34fcd259ef | |||
f036713d71 | |||
f1d57487be | |||
04fd42c63b | |||
1a9829b93b | |||
21069e892e | |||
18a27b195e | |||
6856c452e1 | |||
357b1ae9cb | |||
cd7cfe451c | |||
bc1eb65738 | |||
c03374d6df | |||
d9803c226e | |||
cbcdb36579 | |||
54a23e8060 | |||
1c5b971d86 | |||
30d8f9f350 | |||
dbaf301911 | |||
c412af3de0 | |||
3a8c61ff59 | |||
d51d714352 | |||
474ee9383f | |||
71ab3043f4 |
33
README.md
33
README.md
@ -9,14 +9,43 @@ CLI tool to query LDAP/AD servers
|
|||||||
* Integration with password managers
|
* Integration with password managers
|
||||||
* Various output formats
|
* Various output formats
|
||||||
* Classic LDIF
|
* Classic LDIF
|
||||||
* JSON stream (with detailed or simplified attribute values)
|
* JSON stream (with simplified or detailed attribute values)
|
||||||
* CSV
|
* CSV
|
||||||
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts)
|
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
|
||||||
|
* HTML
|
||||||
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
|
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
|
||||||
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
|
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
|
||||||
* By default the first 1000 entries are shown, and it errors if there are more results
|
* By default the first 1000 entries are shown, and it errors if there are more results
|
||||||
* Use `--all` to show all results
|
* Use `--all` to show all results
|
||||||
|
|
||||||
|
## Virtual attributes
|
||||||
|
|
||||||
|
`ldaptool` supports constructing new values from existing attributes by adding a `:<postprocess>` suffix (which can be chained apart from the length limit).
|
||||||
|
|
||||||
|
* Some suffixes support an argument as `:<postprocess>[<arg>]`.
|
||||||
|
* A single integer as postprocess suffix limits the length of the value; it replaces the last character of the output with `…` if it cut something off.
|
||||||
|
* Multi-valued attributes generate multiple virtual attrites; each value is processed individually. (The values are joined afterwards for table output if needed.)
|
||||||
|
|
||||||
|
### DN handling
|
||||||
|
|
||||||
|
DNs are decoded into lists of lists of `(name, value)` pairs (the inner list usually contains exactly one entry).
|
||||||
|
Attributes with a `DC` name are considered part of the "domain", everything else belongs to the "path".
|
||||||
|
(Usually a DN will start with path segments and end with domain segments.)
|
||||||
|
The path is read from back to front.
|
||||||
|
|
||||||
|
The following postprocess hooks are available:
|
||||||
|
* `domain`: extracts the domain as DNS FQDN (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com`)
|
||||||
|
* `path`: extracts the non-domain parts without names and separates them by `/` (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `Dep1/Someone`)
|
||||||
|
* `fullpath`: uses the `domain` as first segment in a path (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com/Dep1/Someone`)
|
||||||
|
* `dnslice`: extracts a "slice" from a DN (outer list only); the result is still in DN format.
|
||||||
|
|
||||||
|
`path`, `fullpath` and `dnslice` take an optional index/slice as argument, written in python syntax.
|
||||||
|
For `path` and `fullpath` this extracts only the given index/slice from the path (`fullpath` always includes the full FQDN as first segment), `dnslice` operates on the outer list of decoded (lists of) pairs:
|
||||||
|
|
||||||
|
* `dn:dnslice[1:]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `OU=Dep1,DC=example,DC=com`
|
||||||
|
* `dn:fullpath[:-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `example.com/Dep1`
|
||||||
|
* `dn:path[-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `Someone`
|
||||||
|
|
||||||
## Authentication, Protocol, Ports
|
## Authentication, Protocol, Ports
|
||||||
|
|
||||||
`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones.
|
`ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones.
|
||||||
|
65
debian/changelog
vendored
Normal file
65
debian/changelog
vendored
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
ldaptool (0.6-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
* move --json to --full_json; remove --human JSON output, replace with --json, but don't merge multiple values - use list instead
|
||||||
|
* run ./fmt.sh to fix lint
|
||||||
|
|
||||||
|
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Thu, 11 May 2023 17:30:04 +0200
|
||||||
|
|
||||||
|
ldaptool (0.5-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
[ Daniel Dizdarevic ]
|
||||||
|
* :Fix version requirement for python3.10
|
||||||
|
|
||||||
|
[ Stefan Bühler ]
|
||||||
|
* handle missing KeePass entry
|
||||||
|
|
||||||
|
[ Daniel Dizdarevic ]
|
||||||
|
* Catch invalid passwords in keepass
|
||||||
|
* Catch CTRL+C and CTRL+D in password prompts
|
||||||
|
|
||||||
|
[ Stefan Bühler ]
|
||||||
|
* improve some error messages
|
||||||
|
* improve config loading: don't modify dicts to allow yaml repeated nodes
|
||||||
|
* add argument to postprocess steps and support index/slicing in DN-related hooks; document them
|
||||||
|
* decode securityIdentifier attribute as SID
|
||||||
|
|
||||||
|
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Wed, 10 May 2023 19:53:51 +0200
|
||||||
|
|
||||||
|
ldaptool (0.4-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
* move argument/column handling to decoder (prepare for more post-processing in decoder)
|
||||||
|
* move json output format handling to main tool from decoder
|
||||||
|
* support attribute post-processing; :<len>, and DN :domain, :path, :fullpath
|
||||||
|
* use Enum instead of StrEnum for python3.10
|
||||||
|
|
||||||
|
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Tue, 02 May 2023 16:54:00 +0200
|
||||||
|
|
||||||
|
ldaptool (0.3-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
* ldaptool: move output arguments from search to main
|
||||||
|
* run sort internally, refactor table output into separate method
|
||||||
|
* refactor table variant handling
|
||||||
|
* add html output format
|
||||||
|
* README.md: document csvkit dependency
|
||||||
|
* debian: require csvkit (markdown table is an essential feature)
|
||||||
|
|
||||||
|
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 19:31:37 +0200
|
||||||
|
|
||||||
|
ldaptool (0.2-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
* README.md: fix typo
|
||||||
|
* enable tls unless kerberos is used (SASL GSS-API doesn't seem to work over TLS)
|
||||||
|
|
||||||
|
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 17:21:35 +0200
|
||||||
|
|
||||||
|
ldaptool (0.1-1) unstable; urgency=medium
|
||||||
|
|
||||||
|
* Initial release.
|
||||||
|
|
||||||
|
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:30 +0200
|
||||||
|
|
||||||
|
ldaptool (0.1-0) unstable; urgency=medium
|
||||||
|
|
||||||
|
* Stub ITP lintian.
|
||||||
|
|
||||||
|
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Fri, 28 Apr 2023 12:09:29 +0200
|
43
debian/control
vendored
Normal file
43
debian/control
vendored
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
Source: ldaptool
|
||||||
|
Section: net
|
||||||
|
Priority: optional
|
||||||
|
Maintainer: Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
|
||||||
|
Rules-Requires-Root: no
|
||||||
|
Build-Depends:
|
||||||
|
debhelper-compat (= 13),
|
||||||
|
pybuild-plugin-pyproject,
|
||||||
|
flit,
|
||||||
|
dh-sequence-python3,
|
||||||
|
python3,
|
||||||
|
python3-ldap,
|
||||||
|
python3-yaml,
|
||||||
|
python3-pykeepass,
|
||||||
|
#Testsuite: autopkgtest-pkg-python
|
||||||
|
Standards-Version: 4.6.2
|
||||||
|
Homepage: https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool
|
||||||
|
|
||||||
|
Package: python3-ldaptool
|
||||||
|
Architecture: all
|
||||||
|
Depends:
|
||||||
|
${python3:Depends},
|
||||||
|
${misc:Depends},
|
||||||
|
Recommends:
|
||||||
|
python3-pykeepass,
|
||||||
|
Description: CLI tool to run ldap queries
|
||||||
|
CLI tool to query LDAP/AD servers, featuring various output formats
|
||||||
|
and a configuration for different realms.
|
||||||
|
.
|
||||||
|
This package installs the library for Python 3.
|
||||||
|
|
||||||
|
Package: ldaptool
|
||||||
|
Architecture: all
|
||||||
|
Depends:
|
||||||
|
python3-ldaptool (=${binary:Version}),
|
||||||
|
${python3:Depends},
|
||||||
|
${misc:Depends},
|
||||||
|
csvkit,
|
||||||
|
Description: CLI tool to run ldap queries
|
||||||
|
CLI tool to query LDAP/AD servers, featuring various output formats
|
||||||
|
and a configuration for different realms.
|
||||||
|
.
|
||||||
|
This package installs the script.
|
27
debian/copyright
vendored
Normal file
27
debian/copyright
vendored
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
|
||||||
|
Source: <https://git-nks-public.tik.uni-stuttgart.de/net/ldaptool>
|
||||||
|
Upstream-Name: ldaptool
|
||||||
|
|
||||||
|
Files:
|
||||||
|
*
|
||||||
|
Copyright:
|
||||||
|
2023 Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de>
|
||||||
|
2023 Daniel Dizdarevic <daniel.dizdarevic@tik.uni-stuttgart.de>
|
||||||
|
License: MIT
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
.
|
||||||
|
The above copyright notice and this permission notice shall be included in
|
||||||
|
all copies or substantial portions of the Software.
|
||||||
|
.
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
THE SOFTWARE.
|
5
debian/gbp.conf
vendored
Normal file
5
debian/gbp.conf
vendored
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[DEFAULT]
|
||||||
|
pristine-tar = False
|
||||||
|
upstream-branch = main
|
||||||
|
debian-branch = debian
|
||||||
|
upstream-tag = ldaptool-%(version)s
|
13
debian/rules
vendored
Executable file
13
debian/rules
vendored
Executable file
@ -0,0 +1,13 @@
|
|||||||
|
#!/usr/bin/make -f
|
||||||
|
|
||||||
|
export PYBUILD_NAME=ldaptool
|
||||||
|
|
||||||
|
%:
|
||||||
|
dh $@ --buildsystem=pybuild
|
||||||
|
|
||||||
|
# we want /usr/bin/ldaptool in a separate package
|
||||||
|
override_dh_auto_install:
|
||||||
|
dh_auto_install
|
||||||
|
|
||||||
|
mkdir -p debian/ldaptool/usr
|
||||||
|
mv debian/python3-ldaptool/usr/bin debian/ldaptool/usr/
|
1
debian/source/format
vendored
Normal file
1
debian/source/format
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
3.0 (quilt)
|
1
debian/source/options
vendored
Normal file
1
debian/source/options
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
extend-diff-ignore = "^[^/]*[.]egg-info/|^[.]vscode|/__pycache__/|^venv/|^.mypy_cache/"
|
@ -16,7 +16,7 @@ classifiers = [
|
|||||||
]
|
]
|
||||||
dynamic = ["version", "description"]
|
dynamic = ["version", "description"]
|
||||||
|
|
||||||
requires-python = "~=3.11"
|
requires-python = "~=3.10"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"python-ldap",
|
"python-ldap",
|
||||||
"PyYAML",
|
"PyYAML",
|
||||||
|
@ -2,24 +2,118 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import csv
|
import csv
|
||||||
|
import dataclasses
|
||||||
|
import enum
|
||||||
|
import html
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from ldaptool import decode, search
|
from ldaptool import decode, search
|
||||||
|
from ldaptool._utils import argclasses
|
||||||
from ldaptool._utils.ldap import Result, SizeLimitExceeded
|
from ldaptool._utils.ldap import Result, SizeLimitExceeded
|
||||||
|
|
||||||
|
|
||||||
|
class TableOutput(enum.Enum):
|
||||||
|
MARKDOWN = "markdown"
|
||||||
|
CSV = "csv"
|
||||||
|
HTML = "html"
|
||||||
|
|
||||||
|
|
||||||
|
def _html_escape_line(columns: typing.Sequence[str], *, cell: str = "td") -> str:
|
||||||
|
cell_s = f"<{cell}>"
|
||||||
|
cell_e = f"</{cell}>"
|
||||||
|
return "<tr>" + ("".join(cell_s + html.escape(col) + cell_e for col in columns)) + "</tr>\n"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||||
|
class Arguments(search.Arguments):
|
||||||
|
raw: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
|
||||||
|
)
|
||||||
|
csv: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
|
||||||
|
)
|
||||||
|
table: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(
|
||||||
|
help="Markdown table output - requires list of attributes",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
html: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(
|
||||||
|
help="HTML table output - requires list of attributes",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
table_output: typing.Optional[TableOutput] = None
|
||||||
|
sort: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(
|
||||||
|
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
full_json: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(
|
||||||
|
help="Use full json output (dn as str, attributes as list of dicts containing various represenatations)",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
json: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(
|
||||||
|
help="Use simple json output (dn as str, attributes map to list of human-readable strings)",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
||||||
|
|
||||||
|
# pick at most one in csv, (markdown) table, html
|
||||||
|
if [self.csv, self.table, self.html].count(True) > 1:
|
||||||
|
raise SystemExit("Can't use more than one table output type")
|
||||||
|
|
||||||
|
if self.csv:
|
||||||
|
self.table_output = TableOutput.CSV
|
||||||
|
elif self.table:
|
||||||
|
self.table_output = TableOutput.MARKDOWN
|
||||||
|
elif self.html:
|
||||||
|
self.table_output = TableOutput.HTML
|
||||||
|
|
||||||
|
if self.sort and self.table_output is None:
|
||||||
|
# default to markdown table
|
||||||
|
self.table_output = TableOutput.MARKDOWN
|
||||||
|
|
||||||
|
if self.table_output:
|
||||||
|
if not self.columns:
|
||||||
|
raise SystemExit("Table output requires attributes")
|
||||||
|
if self.full_json:
|
||||||
|
raise SystemExit("Can't use both table output and --json")
|
||||||
|
if self.json:
|
||||||
|
raise SystemExit("Can't use both table output and --human")
|
||||||
|
|
||||||
|
if self.raw:
|
||||||
|
if self.table_output:
|
||||||
|
raise SystemExit("Table output requires decode; --raw not allowed")
|
||||||
|
if self.full_json or self.json:
|
||||||
|
raise SystemExit("Decode options require decode; --raw not allowed")
|
||||||
|
|
||||||
|
|
||||||
class _Context:
|
class _Context:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
arguments_p = search.Arguments.add_to_parser(parser)
|
arguments_p = Arguments.add_to_parser(parser)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
try:
|
try:
|
||||||
self.config = search.Config.load()
|
self.config = search.Config.load()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise SystemExit(f"config error: {e}")
|
raise SystemExit(f"config error: {e!r}")
|
||||||
|
try:
|
||||||
self.arguments = arguments_p.from_args(args)
|
self.arguments = arguments_p.from_args(args)
|
||||||
|
except decode.InvalidStep as e:
|
||||||
|
raise SystemExit(f"invalid arguments: {e}")
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
# starting the search sets the base we want to print
|
# starting the search sets the base we want to print
|
||||||
@ -40,24 +134,60 @@ class _Context:
|
|||||||
output = proc.stdin
|
output = proc.stdin
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if self.arguments.table:
|
if self.arguments.table_output == TableOutput.MARKDOWN:
|
||||||
add_filter(["csvlook"])
|
add_filter(["csvlook"])
|
||||||
if self.arguments.sort:
|
if self.arguments.table_output:
|
||||||
add_filter(["csvsort", "--blanks"])
|
self._table_output(search_iterator, stream=output)
|
||||||
self._run_search(search_iterator, stream=output)
|
else:
|
||||||
|
self._ldif_or_json_output(search_iterator, stream=output)
|
||||||
finally:
|
finally:
|
||||||
if procs:
|
if procs:
|
||||||
output.close()
|
output.close()
|
||||||
for proc in reversed(procs):
|
for proc in reversed(procs):
|
||||||
proc.wait()
|
proc.wait()
|
||||||
|
|
||||||
def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
|
def _to_table_lines(self, search_iterator: typing.Iterable[Result]) -> typing.Iterable[tuple[str, ...]]:
|
||||||
|
decoder = decode.Decoder(arguments=self.arguments)
|
||||||
|
# "human" (json) dicts contain data by lower case key:
|
||||||
|
column_keys = [col.lower() for col in self.arguments.columns]
|
||||||
|
try:
|
||||||
|
for dn, entry in search_iterator:
|
||||||
|
if dn is None:
|
||||||
|
continue
|
||||||
|
# normal entry
|
||||||
|
assert not isinstance(entry, list)
|
||||||
|
obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
|
||||||
|
yield tuple(obj.get(key, "") for key in column_keys)
|
||||||
|
except SizeLimitExceeded as e:
|
||||||
|
raise SystemExit(f"Error: {e}")
|
||||||
|
|
||||||
|
def _table_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
|
||||||
|
line_iterator = self._to_table_lines(search_iterator)
|
||||||
|
if self.arguments.sort:
|
||||||
|
line_iterator = sorted(line_iterator)
|
||||||
|
|
||||||
|
if self.arguments.table_output in [TableOutput.CSV, TableOutput.MARKDOWN]:
|
||||||
|
csv_out = csv.writer(stream, lineterminator="\n")
|
||||||
|
csv_out.writerow(self.arguments.columns)
|
||||||
|
|
||||||
|
for line in line_iterator:
|
||||||
|
csv_out.writerow(line)
|
||||||
|
else:
|
||||||
|
assert self.arguments.table_output == TableOutput.HTML
|
||||||
|
|
||||||
|
stream.write("<table>\n")
|
||||||
|
stream.write(_html_escape_line(self.arguments.columns, cell="th"))
|
||||||
|
for line in line_iterator:
|
||||||
|
stream.write(_html_escape_line(line))
|
||||||
|
stream.write("</table>\n")
|
||||||
|
|
||||||
|
def _ldif_or_json_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
|
||||||
decoder = decode.Decoder(arguments=self.arguments)
|
decoder = decode.Decoder(arguments=self.arguments)
|
||||||
|
|
||||||
num_responses = 0
|
num_responses = 0
|
||||||
num_entries = 0
|
num_entries = 0
|
||||||
|
|
||||||
ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human)
|
ldif_output = not (self.arguments.full_json or self.arguments.json)
|
||||||
|
|
||||||
if ldif_output:
|
if ldif_output:
|
||||||
print("# extended LDIF")
|
print("# extended LDIF")
|
||||||
@ -72,22 +202,11 @@ class _Context:
|
|||||||
print("#")
|
print("#")
|
||||||
print()
|
print()
|
||||||
|
|
||||||
if self.arguments.csv:
|
|
||||||
csv_out = csv.DictWriter(
|
|
||||||
stream,
|
|
||||||
fieldnames=self.arguments.columns,
|
|
||||||
lineterminator="\n",
|
|
||||||
extrasaction="ignore",
|
|
||||||
)
|
|
||||||
csv_out.writeheader()
|
|
||||||
# dicts contain data by lower case key
|
|
||||||
csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
for dn, entry in search_iterator:
|
for dn, entry in search_iterator:
|
||||||
num_responses += 1
|
num_responses += 1
|
||||||
if dn is None:
|
if dn is None:
|
||||||
if not self.arguments.csv:
|
if ldif_output:
|
||||||
print("# search reference")
|
print("# search reference")
|
||||||
for ref in entry:
|
for ref in entry:
|
||||||
assert isinstance(ref, str)
|
assert isinstance(ref, str)
|
||||||
@ -97,11 +216,13 @@ class _Context:
|
|||||||
# normal entry
|
# normal entry
|
||||||
assert not isinstance(entry, list)
|
assert not isinstance(entry, list)
|
||||||
num_entries += 1
|
num_entries += 1
|
||||||
obj = decoder.read(dn=dn, entry=entry)
|
if ldif_output:
|
||||||
if self.arguments.csv:
|
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
|
||||||
csv_out.writerow(decoder.human(dn=dn, entry=obj))
|
elif self.arguments.json:
|
||||||
|
decoder.read_and_emit_simple_json(dn=dn, entry=entry, file=stream)
|
||||||
else:
|
else:
|
||||||
decoder.emit(dn=dn, entry=obj)
|
assert self.arguments.full_json
|
||||||
|
decoder.read_and_emit_full_json(dn=dn, entry=entry, file=stream)
|
||||||
except SizeLimitExceeded as e:
|
except SizeLimitExceeded as e:
|
||||||
raise SystemExit(f"Error: {e}")
|
raise SystemExit(f"Error: {e}")
|
||||||
|
|
||||||
|
@ -74,6 +74,9 @@ _TArgs = typing.TypeVar("_TArgs", bound="BaseArguments")
|
|||||||
|
|
||||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||||
class BaseArguments:
|
class BaseArguments:
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_fields_to_parser(
|
def add_fields_to_parser(
|
||||||
cls: type[_TArgs],
|
cls: type[_TArgs],
|
||||||
|
@ -33,19 +33,26 @@ class DNInfo:
|
|||||||
def domain(self) -> str:
|
def domain(self) -> str:
|
||||||
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
|
return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc")
|
||||||
|
|
||||||
def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str:
|
def _path(self, *, escape: typing.Callable[[str], str], sep: str, selection: slice = slice(None)) -> str:
|
||||||
return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc")
|
rev_flattened = [ava[1] for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc"]
|
||||||
|
return sep.join(value for value in rev_flattened[selection])
|
||||||
|
|
||||||
|
def sliced_path(self, selection: slice, /) -> str:
|
||||||
|
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/", selection=selection)
|
||||||
|
|
||||||
@functools.cached_property
|
@functools.cached_property
|
||||||
def path(self) -> str:
|
def path(self) -> str:
|
||||||
return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/")
|
return self.sliced_path(slice(None))
|
||||||
|
|
||||||
@property
|
def sliced_full_path(self, selection: slice, /) -> str:
|
||||||
def full_path(self) -> str:
|
|
||||||
domain = self.domain
|
domain = self.domain
|
||||||
path = self.path
|
path = self.sliced_path(selection)
|
||||||
if not path:
|
if not path:
|
||||||
return self.domain
|
return self.domain
|
||||||
if not domain:
|
if not domain:
|
||||||
return self.path
|
return self.path
|
||||||
return f"{domain}/{path}"
|
return f"{domain}/{path}"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def full_path(self) -> str:
|
||||||
|
return self.sliced_full_path(slice(None))
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from ._decoder import Attribute, Decoder
|
from ._decoder import Attribute, Decoder
|
||||||
|
from ._postprocess import InvalidStep
|
||||||
from .arguments import Arguments
|
from .arguments import Arguments
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"Arguments",
|
"Arguments",
|
||||||
"Attribute",
|
"Attribute",
|
||||||
"Decoder",
|
"Decoder",
|
||||||
|
"InvalidStep",
|
||||||
]
|
]
|
||||||
|
@ -8,8 +8,6 @@ import sys
|
|||||||
import typing
|
import typing
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from ldaptool._utils.dninfo import DNInfo
|
|
||||||
|
|
||||||
from . import _types
|
from . import _types
|
||||||
from .arguments import Arguments
|
from .arguments import Arguments
|
||||||
|
|
||||||
@ -103,7 +101,7 @@ class Attribute:
|
|||||||
return
|
return
|
||||||
|
|
||||||
def _try_decode(self, args: Arguments) -> None:
|
def _try_decode(self, args: Arguments) -> None:
|
||||||
if self.name in ("objectSid",):
|
if self.name in ("objectSid", "securityIdentifier"):
|
||||||
self._try_decode_sid()
|
self._try_decode_sid()
|
||||||
elif self.name in ("msExchMailboxGuid", "objectGUID"):
|
elif self.name in ("msExchMailboxGuid", "objectGUID"):
|
||||||
self._try_decode_uuid()
|
self._try_decode_uuid()
|
||||||
@ -122,16 +120,16 @@ class Attribute:
|
|||||||
def _base64_value(self) -> str:
|
def _base64_value(self) -> str:
|
||||||
return base64.b64encode(self.raw).decode("ascii")
|
return base64.b64encode(self.raw).decode("ascii")
|
||||||
|
|
||||||
def print(self) -> None:
|
def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
if not self.decoded is None:
|
if not self.decoded is None:
|
||||||
comment = self.utf8_clean
|
comment = self.utf8_clean
|
||||||
if comment is None:
|
if comment is None:
|
||||||
comment = self._base64_value
|
comment = self._base64_value
|
||||||
print(f"{self.name}: {self.decoded} # {comment}")
|
print(f"{self.name}: {self.decoded} # {comment}", file=file)
|
||||||
elif not self.utf8_clean is None:
|
elif not self.utf8_clean is None:
|
||||||
print(f"{self.name}: {self.utf8_clean}")
|
print(f"{self.name}: {self.utf8_clean}", file=file)
|
||||||
else:
|
else:
|
||||||
print(f"{self.name}:: {self._base64_value}")
|
print(f"{self.name}:: {self._base64_value}", file=file)
|
||||||
|
|
||||||
def to_json(self) -> dict[str, typing.Any]:
|
def to_json(self) -> dict[str, typing.Any]:
|
||||||
item: dict[str, typing.Any] = {}
|
item: dict[str, typing.Any] = {}
|
||||||
@ -175,55 +173,69 @@ class Decoder:
|
|||||||
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
|
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
|
||||||
for name, raw_values in entry.items()
|
for name, raw_values in entry.items()
|
||||||
}
|
}
|
||||||
if self.arguments.dndomain or self.arguments.dnpath or self.arguments.dnfullpath:
|
|
||||||
dninfo = DNInfo(dn=dn)
|
for attr, post_processes in self.arguments.post_process.items():
|
||||||
if self.arguments.dndomain:
|
if attr == "dn":
|
||||||
decoded_entry["dndomain"] = [
|
values = [dn]
|
||||||
Attribute.fake_attribute("dndomain", dninfo.domain),
|
else:
|
||||||
]
|
attrs = decoded_entry.get(attr, None)
|
||||||
if self.arguments.dnpath:
|
if attrs is None:
|
||||||
decoded_entry["dnpath"] = [
|
continue
|
||||||
Attribute.fake_attribute("dnpath", dninfo.path),
|
values = [at.human() for at in attrs]
|
||||||
]
|
for column, post_process in post_processes.items():
|
||||||
if self.arguments.dnfullpath:
|
decoded_entry[column] = [
|
||||||
decoded_entry["dnfullpath"] = [
|
Attribute.fake_attribute(column, post_process.process(value)) for value in values
|
||||||
Attribute.fake_attribute("dnfullpath", dninfo.full_path),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
return decoded_entry
|
return decoded_entry
|
||||||
|
|
||||||
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
|
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
for name, attrs in entry.items():
|
for name, attrs in obj.items():
|
||||||
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
emit[name] = [attr.human() for attr in attrs]
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
|
def emit_simple_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
|
emit = self.human(dn=dn, obj=obj)
|
||||||
|
json.dump(emit, file, ensure_ascii=False)
|
||||||
|
print(file=file) # terminate output dicts by newline
|
||||||
|
|
||||||
|
def read_and_emit_simple_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
|
self.emit_simple_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
|
|
||||||
|
def full_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
for name, attrs in entry.items():
|
for name, attrs in obj.items():
|
||||||
emit[name] = [attr.to_json() for attr in attrs]
|
emit[name] = [attr.to_json() for attr in attrs]
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def _emit_json(self, *, dn: str, entry: TDecoded) -> None:
|
def emit_full_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
if self.arguments.human:
|
emit = self.full_json(dn=dn, obj=obj)
|
||||||
emit = self.human(dn=dn, entry=entry)
|
json.dump(emit, file, ensure_ascii=False)
|
||||||
else:
|
print(file=file) # terminate output dicts by newline
|
||||||
emit = self.json(dn=dn, entry=entry)
|
|
||||||
json.dump(emit, sys.stdout, ensure_ascii=False)
|
|
||||||
print() # terminate output dicts by newline
|
|
||||||
|
|
||||||
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None:
|
def read_and_emit_full_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
print(f"dn: {dn}")
|
self.emit_full_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
for attrs in entry.values():
|
|
||||||
|
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
|
print(f"dn: {dn}", file=file)
|
||||||
|
attrs: typing.Optional[list[Attribute]]
|
||||||
|
if not self.arguments.attributes:
|
||||||
|
# show all attributes - use order from server
|
||||||
|
for attrs in obj.values():
|
||||||
for attr in attrs:
|
for attr in attrs:
|
||||||
attr.print()
|
attr.print(file=file)
|
||||||
print() # separate entries with newlines
|
|
||||||
|
|
||||||
def emit(self, *, dn: str, entry: TDecoded) -> None:
|
|
||||||
if self.arguments.human or self.arguments.json:
|
|
||||||
self._emit_json(dn=dn, entry=entry)
|
|
||||||
else:
|
else:
|
||||||
self._emit_ldif(dn=dn, entry=entry)
|
# only selected columns; use given order
|
||||||
|
for column in self.arguments.columns_keys:
|
||||||
|
if column == "dn":
|
||||||
|
continue # already printed dn
|
||||||
|
attrs = obj.get(column, None)
|
||||||
|
if attrs is None:
|
||||||
|
continue
|
||||||
|
for attr in attrs:
|
||||||
|
attr.print(file=file)
|
||||||
|
print(file=file) # separate entries with newlines
|
||||||
|
|
||||||
def handle(self, *, dn: str, entry: TEntry) -> None:
|
def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
entry_attrs = self.read(dn=dn, entry=entry)
|
self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
self.emit(dn=dn, entry=entry_attrs)
|
|
||||||
|
195
src/ldaptool/decode/_postprocess.py
Normal file
195
src/ldaptool/decode/_postprocess.py
Normal file
@ -0,0 +1,195 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import dataclasses
|
||||||
|
import typing
|
||||||
|
|
||||||
|
import ldap.dn
|
||||||
|
|
||||||
|
from ldaptool._utils.dninfo import DNInfo
|
||||||
|
|
||||||
|
|
||||||
|
class Step(abc.ABC):
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def step(self, value: str) -> str:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def _args_to_slice(args: str) -> slice:
|
||||||
|
args = args.strip()
|
||||||
|
if not args:
|
||||||
|
return slice(None)
|
||||||
|
params: list[typing.Optional[int]] = []
|
||||||
|
for arg in args.split(":"):
|
||||||
|
arg = arg.strip()
|
||||||
|
if arg:
|
||||||
|
params.append(int(arg))
|
||||||
|
else:
|
||||||
|
params.append(None)
|
||||||
|
if len(params) == 1:
|
||||||
|
assert isinstance(params[0], int)
|
||||||
|
ndx = params[0]
|
||||||
|
if ndx == -1:
|
||||||
|
return slice(ndx, None) # from last element to end - still exactly one element
|
||||||
|
# this doesn't work for ndx == -1: slice(-1, 0) is always empty. otherwise it should return [ndx:][:1].
|
||||||
|
return slice(ndx, ndx + 1)
|
||||||
|
return slice(*params)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True)
|
||||||
|
class MaxLength(Step):
|
||||||
|
limit: int
|
||||||
|
|
||||||
|
def step(self, value: str) -> str:
|
||||||
|
if not self.limit or len(value) <= self.limit:
|
||||||
|
return value
|
||||||
|
return value[: self.limit - 1] + "…"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True)
|
||||||
|
class DNDomain(Step):
|
||||||
|
def __init__(self, args: str) -> None:
|
||||||
|
if args:
|
||||||
|
raise ValueError(":domain doesn't support an argument")
|
||||||
|
|
||||||
|
def step(self, value: str) -> str:
|
||||||
|
try:
|
||||||
|
dninfo = DNInfo(dn=value)
|
||||||
|
except Exception:
|
||||||
|
# not a valid DN -> no processing
|
||||||
|
return value
|
||||||
|
return dninfo.domain
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True)
|
||||||
|
class DNPath(Step):
|
||||||
|
path_slice: slice
|
||||||
|
|
||||||
|
def __init__(self, args: str) -> None:
|
||||||
|
self.path_slice = _args_to_slice(args)
|
||||||
|
|
||||||
|
def step(self, value: str) -> str:
|
||||||
|
try:
|
||||||
|
dninfo = DNInfo(dn=value)
|
||||||
|
except Exception:
|
||||||
|
# not a valid DN -> no processing
|
||||||
|
return value
|
||||||
|
return dninfo.sliced_path(self.path_slice)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True)
|
||||||
|
class DNFullPath(Step):
|
||||||
|
path_slice: slice
|
||||||
|
|
||||||
|
def __init__(self, args: str) -> None:
|
||||||
|
self.path_slice = _args_to_slice(args)
|
||||||
|
|
||||||
|
def step(self, value: str) -> str:
|
||||||
|
try:
|
||||||
|
dninfo = DNInfo(dn=value)
|
||||||
|
except Exception:
|
||||||
|
# not a valid DN -> no processing
|
||||||
|
return value
|
||||||
|
return dninfo.sliced_full_path(self.path_slice)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True)
|
||||||
|
class DNSlice(Step):
|
||||||
|
slice: slice
|
||||||
|
|
||||||
|
def __init__(self, args: str) -> None:
|
||||||
|
self.slice = _args_to_slice(args)
|
||||||
|
|
||||||
|
def step(self, value: str) -> str:
|
||||||
|
try:
|
||||||
|
dninfo = DNInfo(dn=value)
|
||||||
|
except Exception:
|
||||||
|
# not a valid DN -> no processing
|
||||||
|
return value
|
||||||
|
return ldap.dn.dn2str(dninfo.parts[self.slice]) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
_STEPS: dict[str, typing.Callable[[str], Step]] = {
|
||||||
|
"domain": DNDomain,
|
||||||
|
"path": DNPath,
|
||||||
|
"fullpath": DNFullPath,
|
||||||
|
"dnslice": DNSlice,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidStep(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True)
|
||||||
|
class PostProcess:
|
||||||
|
steps: list[Step]
|
||||||
|
|
||||||
|
def process(self, value: str) -> str:
|
||||||
|
for step in self.steps:
|
||||||
|
value = step.step(value)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def parse_steps(steps: str) -> PostProcess:
|
||||||
|
result: list[Step] = []
|
||||||
|
|
||||||
|
cur_id_start = 0
|
||||||
|
cur_args_start = -1
|
||||||
|
current_id = ""
|
||||||
|
current_args = ""
|
||||||
|
count_brackets = 0
|
||||||
|
step_done = False
|
||||||
|
|
||||||
|
def handle_step() -> None:
|
||||||
|
nonlocal cur_id_start, cur_args_start, current_id, current_args, step_done
|
||||||
|
assert step_done
|
||||||
|
|
||||||
|
step_i = _STEPS.get(current_id, None)
|
||||||
|
if step_i is None:
|
||||||
|
try:
|
||||||
|
max_len = int(current_id)
|
||||||
|
result.append(MaxLength(max_len))
|
||||||
|
except ValueError:
|
||||||
|
raise InvalidStep(f"Unknown post-processing step {current_id!r}")
|
||||||
|
else:
|
||||||
|
result.append(step_i(current_args))
|
||||||
|
|
||||||
|
cur_id_start = pos + 1
|
||||||
|
cur_args_start = -1
|
||||||
|
current_id = ""
|
||||||
|
current_args = ""
|
||||||
|
step_done = False
|
||||||
|
|
||||||
|
for pos, char in enumerate(steps):
|
||||||
|
if step_done:
|
||||||
|
if char != ":":
|
||||||
|
raise InvalidStep(f"Require : after step, found {char!r} at pos {pos}")
|
||||||
|
handle_step()
|
||||||
|
elif char == "[":
|
||||||
|
if count_brackets == 0:
|
||||||
|
# end of identifier
|
||||||
|
current_id = steps[cur_id_start:pos]
|
||||||
|
cur_args_start = pos + 1
|
||||||
|
count_brackets += 1
|
||||||
|
elif char == "]":
|
||||||
|
count_brackets -= 1
|
||||||
|
if count_brackets == 0:
|
||||||
|
current_args = steps[cur_args_start:pos]
|
||||||
|
step_done = True
|
||||||
|
elif count_brackets:
|
||||||
|
continue
|
||||||
|
elif not char.isalnum():
|
||||||
|
raise InvalidStep(f"Expecting either alphanumeric, ':' or '[', got {char!r} at {pos}")
|
||||||
|
|
||||||
|
if not step_done:
|
||||||
|
current_id = steps[cur_id_start:]
|
||||||
|
if current_id:
|
||||||
|
step_done = True
|
||||||
|
|
||||||
|
if step_done:
|
||||||
|
handle_step()
|
||||||
|
|
||||||
|
return PostProcess(result)
|
@ -1,47 +1,78 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
|
||||||
from ldaptool._utils import argclasses
|
from ldaptool._utils import argclasses
|
||||||
|
|
||||||
|
from . import _postprocess
|
||||||
|
|
||||||
|
|
||||||
|
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
|
||||||
|
parser.add_argument(
|
||||||
|
metavar="attributes",
|
||||||
|
dest=dest,
|
||||||
|
nargs="*",
|
||||||
|
help="""
|
||||||
|
Attributes to lookup (and columns to display in tables).
|
||||||
|
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||||
class Arguments(argclasses.BaseArguments):
|
class Arguments(argclasses.BaseArguments):
|
||||||
json: bool = dataclasses.field(
|
columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
|
||||||
default=False,
|
columns_keys: list[str] = dataclasses.field(default_factory=list) # lower case column names
|
||||||
metadata=argclasses.arg(help="Use full json output"),
|
attributes: list[str] = dataclasses.field(default_factory=list)
|
||||||
)
|
|
||||||
human: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
|
|
||||||
)
|
|
||||||
human_separator: str = dataclasses.field(
|
human_separator: str = dataclasses.field(
|
||||||
default=", ",
|
default=", ",
|
||||||
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
|
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
|
||||||
)
|
)
|
||||||
|
|
||||||
dateonly: bool = dataclasses.field(
|
dateonly: bool = dataclasses.field(
|
||||||
default=True,
|
default=True,
|
||||||
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
|
metadata=argclasses.arg(help="Use only date part of decoded timestamps"),
|
||||||
)
|
)
|
||||||
dndomain: bool = dataclasses.field(
|
|
||||||
default=False,
|
post_process: dict[str, dict[str, _postprocess.PostProcess]] = dataclasses.field(default_factory=dict)
|
||||||
metadata=argclasses.arg(help="Whether to export a virtual dndomain attribute (DNS domain from dn)"),
|
|
||||||
)
|
def __post_init__(self) -> None:
|
||||||
dnpath: bool = dataclasses.field(
|
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(
|
# extract special attribute names
|
||||||
help="""
|
all_attributes = False
|
||||||
Whether to export a virtual dnpath attribute
|
attributes_set: set[str] = set()
|
||||||
('/' joined values of reversed DN without DNS labels)
|
self.columns_keys = []
|
||||||
"""
|
for column in list(self.columns):
|
||||||
),
|
column = column.lower()
|
||||||
)
|
|
||||||
dnfullpath: bool = dataclasses.field(
|
if column == "*":
|
||||||
default=False,
|
# '*' not allowed as table column, but for LDIF this means: get ALL attributes + do post processing
|
||||||
metadata=argclasses.arg(
|
self.columns.remove("*")
|
||||||
help="""
|
all_attributes = True
|
||||||
Whether to export a virtual dnfullpath attribute
|
continue
|
||||||
('/' joined values of reversed DN; DNS domain as first label)
|
|
||||||
"""
|
self.columns_keys.append(column)
|
||||||
),
|
|
||||||
)
|
if column == "dndomain":
|
||||||
|
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("domain")
|
||||||
|
attributes_set.add("dn")
|
||||||
|
elif column == "dnpath":
|
||||||
|
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("path")
|
||||||
|
attributes_set.add("dn")
|
||||||
|
elif column == "dnfullpath":
|
||||||
|
self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("fullpath")
|
||||||
|
attributes_set.add("dn")
|
||||||
|
else:
|
||||||
|
col_parts = column.split(":", maxsplit=1)
|
||||||
|
attributes_set.add(col_parts[0])
|
||||||
|
if len(col_parts) == 2:
|
||||||
|
source, steps = col_parts
|
||||||
|
self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(steps)
|
||||||
|
|
||||||
|
if all_attributes:
|
||||||
|
self.attributes = []
|
||||||
|
else:
|
||||||
|
self.attributes = list(attributes_set)
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
|
||||||
import dataclasses
|
import dataclasses
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
@ -8,28 +7,8 @@ import ldaptool.decode.arguments
|
|||||||
from ldaptool._utils import argclasses
|
from ldaptool._utils import argclasses
|
||||||
|
|
||||||
|
|
||||||
def _parser_add_attributes(parser: argparse.ArgumentParser, dest: str) -> None:
|
|
||||||
parser.add_argument(
|
|
||||||
metavar="attributes",
|
|
||||||
dest=dest,
|
|
||||||
nargs="*",
|
|
||||||
help="""
|
|
||||||
Attributes to lookup (and columns to display in tables).
|
|
||||||
Fake attributes `dndomain`, `dnpath` an `dnfullpath` are available (created from dn).
|
|
||||||
""",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||||
class Arguments(ldaptool.decode.arguments.Arguments):
|
class Arguments(ldaptool.decode.arguments.Arguments):
|
||||||
# overwrite fields for fake attributes to remove them from argparse;
|
|
||||||
# we enable those based on the attribute list
|
|
||||||
dndomain: bool = False
|
|
||||||
dnpath: bool = False
|
|
||||||
dnfullpath: bool = False
|
|
||||||
|
|
||||||
attributes: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
|
|
||||||
columns: list[str] = dataclasses.field(default_factory=list)
|
|
||||||
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
|
filter: typing.Optional[str] = dataclasses.field(default=None, metadata=argclasses.arg(help="LDAP query filter"))
|
||||||
find: typing.Optional[str] = dataclasses.field(
|
find: typing.Optional[str] = dataclasses.field(
|
||||||
default=None,
|
default=None,
|
||||||
@ -44,10 +23,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
|||||||
default=False,
|
default=False,
|
||||||
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
|
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
|
||||||
)
|
)
|
||||||
raw: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
|
|
||||||
)
|
|
||||||
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
|
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
|
||||||
server: typing.Optional[str] = dataclasses.field(
|
server: typing.Optional[str] = dataclasses.field(
|
||||||
default=None,
|
default=None,
|
||||||
@ -77,24 +52,10 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
|||||||
help="Explicit search base (defaults to root of domain / forest with --gc)",
|
help="Explicit search base (defaults to root of domain / forest with --gc)",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
csv: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
|
|
||||||
)
|
|
||||||
table: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(
|
|
||||||
help="Markdown table output - requires list of attributes",
|
|
||||||
),
|
|
||||||
)
|
|
||||||
sort: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(
|
|
||||||
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
|
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
||||||
|
|
||||||
if not self.filter is None:
|
if not self.filter is None:
|
||||||
if not self.find is None:
|
if not self.find is None:
|
||||||
raise SystemExit("Can't use both --find and --filter")
|
raise SystemExit("Can't use both --find and --filter")
|
||||||
@ -106,46 +67,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
|||||||
else:
|
else:
|
||||||
# probably doesn't like empty filter?
|
# probably doesn't like empty filter?
|
||||||
self.filter = "(objectClass=*)"
|
self.filter = "(objectClass=*)"
|
||||||
|
|
||||||
# can't print both csv and markdown
|
|
||||||
if self.csv and self.table:
|
|
||||||
raise SystemExit("Can't use both --table and --csv")
|
|
||||||
|
|
||||||
if self.sort:
|
|
||||||
if not self.table and not self.csv:
|
|
||||||
# default to markdown table
|
|
||||||
self.table = True
|
|
||||||
|
|
||||||
if self.table:
|
|
||||||
# markdown requires underlying csv
|
|
||||||
self.csv = True
|
|
||||||
|
|
||||||
# extract special attribute names
|
|
||||||
self.columns = self.attributes # use all names for columns (headings and their order)
|
|
||||||
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
|
|
||||||
# create fake attributes on demand
|
|
||||||
if attributes_set.pop("dndomain", ""):
|
|
||||||
self.dndomain = True
|
|
||||||
if attributes_set.pop("dnpath", ""):
|
|
||||||
self.dnpath = True
|
|
||||||
if attributes_set.pop("dnfullpath", ""):
|
|
||||||
self.dnfullpath = True
|
|
||||||
# store remaining attributes (with original case)
|
|
||||||
self.attributes = list(attributes_set.values())
|
|
||||||
if self.columns and not self.attributes:
|
|
||||||
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
|
|
||||||
self.attributes = ["dn"]
|
|
||||||
|
|
||||||
if self.csv:
|
|
||||||
if not self.columns:
|
|
||||||
raise SystemExit("Table output requires attributes")
|
|
||||||
if self.json:
|
|
||||||
raise SystemExit("Can't use both --table / --csv / --sort and --json")
|
|
||||||
if self.human:
|
|
||||||
raise SystemExit("Can't use both --table / --csv / --sort and --human")
|
|
||||||
|
|
||||||
if self.raw:
|
|
||||||
if self.csv:
|
|
||||||
raise SystemExit("Table output requires decode; --raw not allowed")
|
|
||||||
if self.json or self.human:
|
|
||||||
raise SystemExit("Decode options require decode; --raw not allowed")
|
|
||||||
|
@ -7,6 +7,7 @@ import os
|
|||||||
import os.path
|
import os.path
|
||||||
import shlex
|
import shlex
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import sys
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
@ -28,13 +29,13 @@ class Realm:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load(name: str, data: typing.Any) -> Realm:
|
def load(name: str, data: typing.Any) -> Realm:
|
||||||
assert isinstance(data, dict)
|
assert isinstance(data, dict), f"Realm section isn't a dictionary: {data!r}"
|
||||||
domain = data.pop("domain")
|
domain = data["domain"]
|
||||||
servers = data.pop("servers").split()
|
servers = data["servers"].split()
|
||||||
forest_root_domain = data.pop("forest_root_domain", domain)
|
forest_root_domain = data.get("forest_root_domain", domain)
|
||||||
account = data.pop("account", None)
|
account = data.get("account", None)
|
||||||
password_file = data.pop("password_file", None)
|
password_file = data.get("password_file", None)
|
||||||
password_folder = data.pop("password_folder", None)
|
password_folder = data.get("password_folder", None)
|
||||||
return Realm(
|
return Realm(
|
||||||
name=name,
|
name=name,
|
||||||
domain=domain,
|
domain=domain,
|
||||||
@ -101,8 +102,8 @@ class Keyringer(PasswordManager):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def load(data: typing.Any) -> Keyringer:
|
def load(data: typing.Any) -> Keyringer:
|
||||||
assert isinstance(data, dict)
|
assert isinstance(data, dict)
|
||||||
keyring = data.pop("keyring")
|
keyring = data["keyring"]
|
||||||
folder = data.pop("folder")
|
folder = data.get("folder", "")
|
||||||
return Keyringer(keyring=keyring, folder=folder)
|
return Keyringer(keyring=keyring, folder=folder)
|
||||||
|
|
||||||
def get_password(self, password_name: str) -> str:
|
def get_password(self, password_name: str) -> str:
|
||||||
@ -145,9 +146,17 @@ class Keepass(PasswordManager):
|
|||||||
def get_password(self, password_name: str) -> str:
|
def get_password(self, password_name: str) -> str:
|
||||||
import pykeepass # already made sure it is avaiable above
|
import pykeepass # already made sure it is avaiable above
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
password = getpass.getpass(f"KeePass password for database {self.database}: ")
|
password = getpass.getpass(f"KeePass password for database {self.database}: ")
|
||||||
kp = pykeepass.PyKeePass(self.database, password=password)
|
kp = pykeepass.PyKeePass(self.database, password=password)
|
||||||
|
break
|
||||||
|
except pykeepass.exceptions.CredentialsError:
|
||||||
|
print("Invalid password", file=sys.stderr)
|
||||||
|
|
||||||
entry = kp.find_entries(username=password_name, first=True)
|
entry = kp.find_entries(username=password_name, first=True)
|
||||||
|
if not entry:
|
||||||
|
raise SystemExit(f"no KeePass entry for {password_name!r} found")
|
||||||
return entry.password # type: ignore
|
return entry.password # type: ignore
|
||||||
|
|
||||||
|
|
||||||
@ -190,8 +199,8 @@ class Config:
|
|||||||
with open(conf_path) as f:
|
with open(conf_path) as f:
|
||||||
data = yaml.safe_load(f)
|
data = yaml.safe_load(f)
|
||||||
assert isinstance(data, dict)
|
assert isinstance(data, dict)
|
||||||
assert "realms" in data
|
assert "realms" in data, "Missing realms section in config"
|
||||||
realms_data = data.pop("realms")
|
realms_data = data["realms"]
|
||||||
assert isinstance(realms_data, dict)
|
assert isinstance(realms_data, dict)
|
||||||
realms = {}
|
realms = {}
|
||||||
for name, realm_data in realms_data.items():
|
for name, realm_data in realms_data.items():
|
||||||
@ -201,15 +210,15 @@ class Config:
|
|||||||
if "keyringer" in data:
|
if "keyringer" in data:
|
||||||
if password_manager:
|
if password_manager:
|
||||||
raise ValueError("Can only set a single password manager")
|
raise ValueError("Can only set a single password manager")
|
||||||
password_manager = Keyringer.load(data.pop("keyringer"))
|
password_manager = Keyringer.load(data["keyringer"])
|
||||||
if "keepass" in data:
|
if "keepass" in data:
|
||||||
if password_manager:
|
if password_manager:
|
||||||
raise ValueError("Can only set a single password manager")
|
raise ValueError("Can only set a single password manager")
|
||||||
password_manager = Keepass.load(data.pop("keepass"))
|
password_manager = Keepass.load(data["keepass"])
|
||||||
if "password-script" in data:
|
if "password-script" in data:
|
||||||
if password_manager:
|
if password_manager:
|
||||||
raise ValueError("Can only set a single password manager")
|
raise ValueError("Can only set a single password manager")
|
||||||
password_manager = PasswordScript.load(data.pop("password-script"))
|
password_manager = PasswordScript.load(data["password-script"])
|
||||||
|
|
||||||
return Config(realms=realms, password_manager=password_manager)
|
return Config(realms=realms, password_manager=password_manager)
|
||||||
|
|
||||||
@ -220,7 +229,11 @@ class Config:
|
|||||||
"""
|
"""
|
||||||
if realm.account is None:
|
if realm.account is None:
|
||||||
raise RuntimeError("Can't get password without acccount - should use kerberos instead")
|
raise RuntimeError("Can't get password without acccount - should use kerberos instead")
|
||||||
|
|
||||||
|
try:
|
||||||
if self.password_manager:
|
if self.password_manager:
|
||||||
return self.password_manager.get_password(realm.password_name)
|
return self.password_manager.get_password(realm.password_name)
|
||||||
|
|
||||||
return getpass.getpass(f"Enter password for {realm.password_name}: ")
|
return getpass.getpass(f"Enter password for {realm.password_name}: ")
|
||||||
|
except (KeyboardInterrupt, EOFError):
|
||||||
|
raise SystemExit("Password prompt / retrieval aborted")
|
||||||
|
Reference in New Issue
Block a user