Compare commits

..

No commits in common. "1c5b971d8635a6e324024e66184354609d62f9a7" and "a08154cff83161f1bd9667dc01cee554093b5763" have entirely different histories.

3 changed files with 71 additions and 126 deletions

View File

@ -11,8 +11,7 @@ CLI tool to query LDAP/AD servers
* Classic LDIF
* JSON stream (with detailed or simplified attribute values)
* CSV
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
* HTML
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts)
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
* By default the first 1000 entries are shown, and it errors if there are more results

View File

@ -2,97 +2,18 @@ from __future__ import annotations
import argparse
import csv
import dataclasses
import enum
import html
import subprocess
import sys
import typing
from ldaptool import decode, search
from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded
class TableOutput(enum.StrEnum):
MARKDOWN = "markdown"
CSV = "csv"
HTML = "html"
def _html_escape_line(columns: typing.Sequence[str], *, cell: str = "td") -> str:
cell_s = f"<{cell}>"
cell_e = f"</{cell}>"
return "<tr>" + ("".join(cell_s + html.escape(col) + cell_e for col in columns)) + "</tr>\n"
@dataclasses.dataclass(slots=True, kw_only=True)
class Arguments(search.Arguments):
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
table_output: typing.Optional[TableOutput] = None
html: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="HTML table output - requires list of attributes",
),
)
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
def __post_init__(self) -> None:
super(Arguments, self).__post_init__() # super() not working here, unclear why.
# pick at most one in csv, (markdown) table, html
if [self.csv, self.table, self.html].count(True) > 1:
raise SystemExit("Can't use more than one table output type")
if self.csv:
self.table_output = TableOutput.CSV
elif self.table:
self.table_output = TableOutput.MARKDOWN
elif self.html:
self.table_output = TableOutput.HTML
if self.sort and self.table_output is None:
# default to markdown table
self.table_output = TableOutput.MARKDOWN
if self.table_output:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both table output and --json")
if self.human:
raise SystemExit("Can't use both table output and --human")
if self.raw:
if self.table_output:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")
class _Context:
def __init__(self) -> None:
parser = argparse.ArgumentParser()
arguments_p = Arguments.add_to_parser(parser)
arguments_p = search.Arguments.add_to_parser(parser)
args = parser.parse_args()
try:
self.config = search.Config.load()
@ -119,60 +40,24 @@ class _Context:
output = proc.stdin
try:
if self.arguments.table_output == TableOutput.MARKDOWN:
if self.arguments.table:
add_filter(["csvlook"])
if self.arguments.table_output:
self._table_output(search_iterator, stream=output)
else:
self._ldif_or_json_output(search_iterator, stream=output)
if self.arguments.sort:
add_filter(["csvsort", "--blanks"])
self._run_search(search_iterator, stream=output)
finally:
if procs:
output.close()
for proc in reversed(procs):
proc.wait()
def _to_table_lines(self, search_iterator: typing.Iterable[Result]) -> typing.Iterable[tuple[str, ...]]:
decoder = decode.Decoder(arguments=self.arguments)
# "human" (json) dicts contain data by lower case key:
column_keys = [col.lower() for col in self.arguments.columns]
try:
for dn, entry in search_iterator:
if dn is None:
continue
# normal entry
assert not isinstance(entry, list)
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
yield tuple(obj.get(key, "") for key in column_keys)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
def _table_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
line_iterator = self._to_table_lines(search_iterator)
if self.arguments.sort:
line_iterator = sorted(line_iterator)
if self.arguments.table_output in [TableOutput.CSV, TableOutput.MARKDOWN]:
csv_out = csv.writer(stream, lineterminator="\n")
csv_out.writerow(self.arguments.columns)
for line in line_iterator:
csv_out.writerow(line)
else:
assert self.arguments.table_output == TableOutput.HTML
stream.write("<table>\n")
stream.write(_html_escape_line(self.arguments.columns, cell="th"))
for line in line_iterator:
stream.write(_html_escape_line(line))
stream.write("</table>\n")
def _ldif_or_json_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
decoder = decode.Decoder(arguments=self.arguments)
num_responses = 0
num_entries = 0
ldif_output = not (self.arguments.json or self.arguments.human)
ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human)
if ldif_output:
print("# extended LDIF")
@ -187,11 +72,22 @@ class _Context:
print("#")
print()
if self.arguments.csv:
csv_out = csv.DictWriter(
stream,
fieldnames=self.arguments.columns,
lineterminator="\n",
extrasaction="ignore",
)
csv_out.writeheader()
# dicts contain data by lower case key
csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
try:
for dn, entry in search_iterator:
num_responses += 1
if dn is None:
if ldif_output:
if not self.arguments.csv:
print("# search reference")
for ref in entry:
assert isinstance(ref, str)
@ -202,6 +98,9 @@ class _Context:
assert not isinstance(entry, list)
num_entries += 1
obj = decoder.read(dn=dn, entry=entry)
if self.arguments.csv:
csv_out.writerow(decoder.human(dn=dn, entry=obj))
else:
decoder.emit(dn=dn, entry=obj)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")

View File

@ -44,6 +44,10 @@ class Arguments(ldaptool.decode.arguments.Arguments):
default=False,
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
)
raw: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
)
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
server: typing.Optional[str] = dataclasses.field(
default=None,
@ -73,6 +77,22 @@ class Arguments(ldaptool.decode.arguments.Arguments):
help="Explicit search base (defaults to root of domain / forest with --gc)",
),
)
csv: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
)
table: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Markdown table output - requires list of attributes",
),
)
sort: bool = dataclasses.field(
default=False,
metadata=argclasses.arg(
help="Sorted table output - defaults to markdown --table unless --csv is given",
),
)
def __post_init__(self) -> None:
if not self.filter is None:
@ -87,6 +107,19 @@ class Arguments(ldaptool.decode.arguments.Arguments):
# probably doesn't like empty filter?
self.filter = "(objectClass=*)"
# can't print both csv and markdown
if self.csv and self.table:
raise SystemExit("Can't use both --table and --csv")
if self.sort:
if not self.table and not self.csv:
# default to markdown table
self.table = True
if self.table:
# markdown requires underlying csv
self.csv = True
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
@ -102,3 +135,17 @@ class Arguments(ldaptool.decode.arguments.Arguments):
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]
if self.csv:
if not self.columns:
raise SystemExit("Table output requires attributes")
if self.json:
raise SystemExit("Can't use both --table / --csv / --sort and --json")
if self.human:
raise SystemExit("Can't use both --table / --csv / --sort and --human")
if self.raw:
if self.csv:
raise SystemExit("Table output requires decode; --raw not allowed")
if self.json or self.human:
raise SystemExit("Decode options require decode; --raw not allowed")