ldaptool-0.3

-----BEGIN PGP SIGNATURE-----
 
 iQJYBAABCgBCFiEEcdms641aWv8vJSXMIcx4mUG+20gFAmRMAqIkHHN0ZWZhbi5i
 dWVobGVyQHRpay51bmktc3R1dHRnYXJ0LmRlAAoJECHMeJlBvttIt3IQAIMnGAHS
 doZPKZXestqS+PnHSZme/xywtI+Nc/IVjaR8/GJLHyy+NorRmCfW2xZHQTBot6mW
 8mAc0fVhIJG4Go1nb/UJ7XrpPrnL+VtM7Ab/WsdShIV7XwoFbdQPKaKUTbXoq0It
 k41rlAdMzXPOZYGkDnBKxwOpFvq906Ca0Edurfu38dCO4ZvFmxeS5Cu8VBY/a4Ma
 iSwZBK/oQuKvTQRQcuiUTr4p69mT5M+BpMQ6iq3zBYuOvfV6u9wO9CycCmagfv/X
 e64tdDlJkcV4OWMxkzctahls+rN+mb4+LniiRJwWw3CjSk8Ym+87QHfJiG32wXGr
 q/eYYcmrCd0u/rUT7e7+58Ln19T27pRam3ZRtdBpqGR7Xt8iqaZu3Yx96+UKwrAv
 e/W1Emvb53J8CavRmYdNwOkhLrcj5SI78WyHNa6jBr4GV1ElDPgtMkjqxo7KUPys
 AQAe8ukcVXkcFcWsm0iNH/zOQWP8vXuJMNX7p24ui6h5K+E78TkUVXuxj/+5bBk8
 r8Sb783Ii27T9GheeRtpMDoOZRTvXs4P1Q6zcKB6ZkTiw/ygrbY++zgHiWMPajxR
 zo0e9dYBYq0IRkKlkYoN36ZTquA62YDj4pJ+8jPg6i3+UyVbFS7U2Ohj1O43cYwH
 kAclanAOK5WlowbbyV7xTh0h4N1IekcQZzrx
 =TVlY
 -----END PGP SIGNATURE-----

Merge tag 'ldaptool-0.3' into debian

ldaptool-0.3
This commit is contained in:
Stefan Bühler 2023-04-28 19:30:40 +02:00
commit 54a23e8060
3 changed files with 126 additions and 71 deletions

View File

@ -11,7 +11,8 @@ CLI tool to query LDAP/AD servers
* Classic LDIF * Classic LDIF
* JSON stream (with detailed or simplified attribute values) * JSON stream (with detailed or simplified attribute values)
* CSV * CSV
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts) * Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
* HTML
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl) * Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination * Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
* By default the first 1000 entries are shown, and it errors if there are more results * By default the first 1000 entries are shown, and it errors if there are more results

View File

@ -2,18 +2,97 @@ from __future__ import annotations
import argparse import argparse
import csv import csv
import dataclasses
import enum
import html
import subprocess import subprocess
import sys import sys
import typing import typing
from ldaptool import decode, search from ldaptool import decode, search
from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded from ldaptool._utils.ldap import Result, SizeLimitExceeded
class TableOutput(enum.StrEnum):
MARKDOWN = "markdown"
CSV = "csv"
HTML = "html"
def _html_escape_line(columns: typing.Sequence[str], *, cell: str = "td") -> str:
cell_s = f"<{cell}>"
cell_e = f"</{cell}>"
return "<tr>" + ("".join(cell_s + html.escape(col) + cell_e for col in columns)) + "</tr>\n"
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(search.Arguments):
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
html: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="HTML table output - requires list of attributes",
),
)
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
# pick at most one in csv, (markdown) table, html
if [self.csv, self.table, self.html].count(True) > 1:
raise SystemExit("Can't use more than one table output type")
if self.csv:
self.table_output = TableOutput.CSV
elif self.table:
self.table_output = TableOutput.MARKDOWN
elif self.html:
self.table_output = TableOutput.HTML
if self.sort and self.table_output is None:
# default to markdown table
self.table_output = TableOutput.MARKDOWN
if self.table_output:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both table output and --json")
if self.human:
raise SystemExit("Can't use both table output and --human")
if self.raw:
if self.table_output:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")
class _Context: class _Context:
def __init__(self) -> None: def __init__(self) -> None:
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
arguments_p = search.Arguments.add_to_parser(parser) arguments_p = Arguments.add_to_parser(parser)
args = parser.parse_args() args = parser.parse_args()
try: try:
self.config = search.Config.load() self.config = search.Config.load()
@ -40,24 +119,60 @@ class _Context:
output = proc.stdin output = proc.stdin
try: try:
if self.arguments.table: if self.arguments.table_output == TableOutput.MARKDOWN:
add_filter(["csvlook"]) add_filter(["csvlook"])
if self.arguments.sort: if self.arguments.table_output:
add_filter(["csvsort", "--blanks"]) self._table_output(search_iterator, stream=output)
self._run_search(search_iterator, stream=output) else:
self._ldif_or_json_output(search_iterator, stream=output)
finally: finally:
if procs: if procs:
output.close() output.close()
for proc in reversed(procs): for proc in reversed(procs):
proc.wait() proc.wait()
def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None: def _to_table_lines(self, search_iterator: typing.Iterable[Result]) -> typing.Iterable[tuple[str, ...]]:
decoder = decode.Decoder(arguments=self.arguments)
# "human" (json) dicts contain data by lower case key:
column_keys = [col.lower() for col in self.arguments.columns]
try:
for dn, entry in search_iterator:
if dn is None:
continue
# normal entry
assert not isinstance(entry, list)
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
def _table_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
line_iterator = self._to_table_lines(search_iterator)
if self.arguments.sort:
line_iterator = sorted(line_iterator)
if self.arguments.table_output in [TableOutput.CSV, TableOutput.MARKDOWN]:
csv_out = csv.writer(stream, lineterminator="\n")
csv_out.writerow(self.arguments.columns)
for line in line_iterator:
csv_out.writerow(line)
else:
assert self.arguments.table_output == TableOutput.HTML
stream.write("<table>\n")
stream.write(_html_escape_line(self.arguments.columns, cell="th"))
for line in line_iterator:
stream.write(_html_escape_line(line))
stream.write("</table>\n")
def _ldif_or_json_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
decoder = decode.Decoder(arguments=self.arguments) decoder = decode.Decoder(arguments=self.arguments)
num_responses = 0 num_responses = 0
num_entries = 0 num_entries = 0
ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human) ldif_output = not (self.arguments.json or self.arguments.human)
if ldif_output: if ldif_output:
print("# extended LDIF") print("# extended LDIF")
@ -72,22 +187,11 @@ class _Context:
print("#") print("#")
print() print()
if self.arguments.csv:
csv_out = csv.DictWriter(
stream,
fieldnames=self.arguments.columns,
lineterminator="\n",
extrasaction="ignore",
)
csv_out.writeheader()
# dicts contain data by lower case key
csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
try: try:
for dn, entry in search_iterator: for dn, entry in search_iterator:
num_responses += 1 num_responses += 1
if dn is None: if dn is None:
if not self.arguments.csv: if ldif_output:
print("# search reference") print("# search reference")
for ref in entry: for ref in entry:
assert isinstance(ref, str) assert isinstance(ref, str)
@ -98,10 +202,7 @@ class _Context:
assert not isinstance(entry, list) assert not isinstance(entry, list)
num_entries += 1 num_entries += 1
obj = decoder.read(dn=dn, entry=entry) obj = decoder.read(dn=dn, entry=entry)
if self.arguments.csv: decoder.emit(dn=dn, entry=obj)
csv_out.writerow(decoder.human(dn=dn, entry=obj))
else:
decoder.emit(dn=dn, entry=obj)
except SizeLimitExceeded as e: except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}") raise SystemExit(f"Error: {e}")

View File

@ -44,10 +44,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
default=False, default=False,
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"), metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
) )
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in")) realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
server: typing.Optional[str] = dataclasses.field( server: typing.Optional[str] = dataclasses.field(
default=None, default=None,
@ -77,22 +73,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
help="Explicit search base (defaults to root of domain / forest with --gc)", help="Explicit search base (defaults to root of domain / forest with --gc)",
), ),
) )
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
def __post_init__(self) -> None: def __post_init__(self) -> None:
if not self.filter is None: if not self.filter is None:
@ -107,19 +87,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
# probably doesn't like empty filter? # probably doesn't like empty filter?
self.filter = "(objectClass=*)" self.filter = "(objectClass=*)"
# can't print both csv and markdown
if self.csv and self.table:
raise SystemExit("Can't use both --table and --csv")
if self.sort:
if not self.table and not self.csv:
# default to markdown table
self.table = True
if self.table:
# markdown requires underlying csv
self.csv = True
# extract special attribute names # extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order) self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
@ -135,17 +102,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
if self.columns and not self.attributes: if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes # if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"] self.attributes = ["dn"]
if self.csv:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both --table / --csv / --sort and --json")
if self.human:
raise SystemExit("Can't use both --table / --csv / --sort and --human")
if self.raw:
if self.csv:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")