Compare commits
5 Commits
a08154cff8
...
1c5b971d86
Author | SHA1 | Date | |
---|---|---|---|
1c5b971d86 | |||
30d8f9f350 | |||
dbaf301911 | |||
c412af3de0 | |||
3a8c61ff59 |
@ -11,7 +11,8 @@ CLI tool to query LDAP/AD servers
|
||||
* Classic LDIF
|
||||
* JSON stream (with detailed or simplified attribute values)
|
||||
* CSV
|
||||
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts)
|
||||
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
|
||||
* HTML
|
||||
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
|
||||
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
|
||||
* By default the first 1000 entries are shown, and it errors if there are more results
|
||||
|
@ -2,18 +2,97 @@ from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import dataclasses
|
||||
import enum
|
||||
import html
|
||||
import subprocess
|
||||
import sys
|
||||
import typing
|
||||
|
||||
from ldaptool import decode, search
|
||||
from ldaptool._utils import argclasses
|
||||
from ldaptool._utils.ldap import Result, SizeLimitExceeded
|
||||
|
||||
|
||||
class TableOutput(enum.StrEnum):
|
||||
MARKDOWN = "markdown"
|
||||
CSV = "csv"
|
||||
HTML = "html"
|
||||
|
||||
|
||||
def _html_escape_line(columns: typing.Sequence[str], *, cell: str = "td") -> str:
|
||||
cell_s = f"<{cell}>"
|
||||
cell_e = f"</{cell}>"
|
||||
return "<tr>" + ("".join(cell_s + html.escape(col) + cell_e for col in columns)) + "</tr>\n"
|
||||
|
||||
|
||||
@dataclasses.dataclass(slots=True, kw_only=True)
|
||||
class Arguments(search.Arguments):
|
||||
raw: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
|
||||
)
|
||||
csv: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
|
||||
)
|
||||
table: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(
|
||||
help="Markdown table output - requires list of attributes",
|
||||
),
|
||||
)
|
||||
table_output: typing.Optional[TableOutput] = None
|
||||
html: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(
|
||||
help="HTML table output - requires list of attributes",
|
||||
),
|
||||
)
|
||||
sort: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(
|
||||
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
||||
),
|
||||
)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
||||
|
||||
# pick at most one in csv, (markdown) table, html
|
||||
if [self.csv, self.table, self.html].count(True) > 1:
|
||||
raise SystemExit("Can't use more than one table output type")
|
||||
|
||||
if self.csv:
|
||||
self.table_output = TableOutput.CSV
|
||||
elif self.table:
|
||||
self.table_output = TableOutput.MARKDOWN
|
||||
elif self.html:
|
||||
self.table_output = TableOutput.HTML
|
||||
|
||||
if self.sort and self.table_output is None:
|
||||
# default to markdown table
|
||||
self.table_output = TableOutput.MARKDOWN
|
||||
|
||||
if self.table_output:
|
||||
if not self.columns:
|
||||
raise SystemExit("Table output requires attributes")
|
||||
if self.json:
|
||||
raise SystemExit("Can't use both table output and --json")
|
||||
if self.human:
|
||||
raise SystemExit("Can't use both table output and --human")
|
||||
|
||||
if self.raw:
|
||||
if self.table_output:
|
||||
raise SystemExit("Table output requires decode; --raw not allowed")
|
||||
if self.json or self.human:
|
||||
raise SystemExit("Decode options require decode; --raw not allowed")
|
||||
|
||||
|
||||
class _Context:
|
||||
def __init__(self) -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
arguments_p = search.Arguments.add_to_parser(parser)
|
||||
arguments_p = Arguments.add_to_parser(parser)
|
||||
args = parser.parse_args()
|
||||
try:
|
||||
self.config = search.Config.load()
|
||||
@ -40,24 +119,60 @@ class _Context:
|
||||
output = proc.stdin
|
||||
|
||||
try:
|
||||
if self.arguments.table:
|
||||
if self.arguments.table_output == TableOutput.MARKDOWN:
|
||||
add_filter(["csvlook"])
|
||||
if self.arguments.sort:
|
||||
add_filter(["csvsort", "--blanks"])
|
||||
self._run_search(search_iterator, stream=output)
|
||||
if self.arguments.table_output:
|
||||
self._table_output(search_iterator, stream=output)
|
||||
else:
|
||||
self._ldif_or_json_output(search_iterator, stream=output)
|
||||
finally:
|
||||
if procs:
|
||||
output.close()
|
||||
for proc in reversed(procs):
|
||||
proc.wait()
|
||||
|
||||
def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
|
||||
def _to_table_lines(self, search_iterator: typing.Iterable[Result]) -> typing.Iterable[tuple[str, ...]]:
|
||||
decoder = decode.Decoder(arguments=self.arguments)
|
||||
# "human" (json) dicts contain data by lower case key:
|
||||
column_keys = [col.lower() for col in self.arguments.columns]
|
||||
try:
|
||||
for dn, entry in search_iterator:
|
||||
if dn is None:
|
||||
continue
|
||||
# normal entry
|
||||
assert not isinstance(entry, list)
|
||||
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
|
||||
yield tuple(obj.get(key, "") for key in column_keys)
|
||||
except SizeLimitExceeded as e:
|
||||
raise SystemExit(f"Error: {e}")
|
||||
|
||||
def _table_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
|
||||
line_iterator = self._to_table_lines(search_iterator)
|
||||
if self.arguments.sort:
|
||||
line_iterator = sorted(line_iterator)
|
||||
|
||||
if self.arguments.table_output in [TableOutput.CSV, TableOutput.MARKDOWN]:
|
||||
csv_out = csv.writer(stream, lineterminator="\n")
|
||||
csv_out.writerow(self.arguments.columns)
|
||||
|
||||
for line in line_iterator:
|
||||
csv_out.writerow(line)
|
||||
else:
|
||||
assert self.arguments.table_output == TableOutput.HTML
|
||||
|
||||
stream.write("<table>\n")
|
||||
stream.write(_html_escape_line(self.arguments.columns, cell="th"))
|
||||
for line in line_iterator:
|
||||
stream.write(_html_escape_line(line))
|
||||
stream.write("</table>\n")
|
||||
|
||||
def _ldif_or_json_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
|
||||
decoder = decode.Decoder(arguments=self.arguments)
|
||||
|
||||
num_responses = 0
|
||||
num_entries = 0
|
||||
|
||||
ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human)
|
||||
ldif_output = not (self.arguments.json or self.arguments.human)
|
||||
|
||||
if ldif_output:
|
||||
print("# extended LDIF")
|
||||
@ -72,22 +187,11 @@ class _Context:
|
||||
print("#")
|
||||
print()
|
||||
|
||||
if self.arguments.csv:
|
||||
csv_out = csv.DictWriter(
|
||||
stream,
|
||||
fieldnames=self.arguments.columns,
|
||||
lineterminator="\n",
|
||||
extrasaction="ignore",
|
||||
)
|
||||
csv_out.writeheader()
|
||||
# dicts contain data by lower case key
|
||||
csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
|
||||
|
||||
try:
|
||||
for dn, entry in search_iterator:
|
||||
num_responses += 1
|
||||
if dn is None:
|
||||
if not self.arguments.csv:
|
||||
if ldif_output:
|
||||
print("# search reference")
|
||||
for ref in entry:
|
||||
assert isinstance(ref, str)
|
||||
@ -98,10 +202,7 @@ class _Context:
|
||||
assert not isinstance(entry, list)
|
||||
num_entries += 1
|
||||
obj = decoder.read(dn=dn, entry=entry)
|
||||
if self.arguments.csv:
|
||||
csv_out.writerow(decoder.human(dn=dn, entry=obj))
|
||||
else:
|
||||
decoder.emit(dn=dn, entry=obj)
|
||||
decoder.emit(dn=dn, entry=obj)
|
||||
except SizeLimitExceeded as e:
|
||||
raise SystemExit(f"Error: {e}")
|
||||
|
||||
|
@ -44,10 +44,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
||||
default=False,
|
||||
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
|
||||
)
|
||||
raw: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
|
||||
)
|
||||
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
|
||||
server: typing.Optional[str] = dataclasses.field(
|
||||
default=None,
|
||||
@ -77,22 +73,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
||||
help="Explicit search base (defaults to root of domain / forest with --gc)",
|
||||
),
|
||||
)
|
||||
csv: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(help="CSV output - requires list of attributes"),
|
||||
)
|
||||
table: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(
|
||||
help="Markdown table output - requires list of attributes",
|
||||
),
|
||||
)
|
||||
sort: bool = dataclasses.field(
|
||||
default=False,
|
||||
metadata=argclasses.arg(
|
||||
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
||||
),
|
||||
)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not self.filter is None:
|
||||
@ -107,19 +87,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
||||
# probably doesn't like empty filter?
|
||||
self.filter = "(objectClass=*)"
|
||||
|
||||
# can't print both csv and markdown
|
||||
if self.csv and self.table:
|
||||
raise SystemExit("Can't use both --table and --csv")
|
||||
|
||||
if self.sort:
|
||||
if not self.table and not self.csv:
|
||||
# default to markdown table
|
||||
self.table = True
|
||||
|
||||
if self.table:
|
||||
# markdown requires underlying csv
|
||||
self.csv = True
|
||||
|
||||
# extract special attribute names
|
||||
self.columns = self.attributes # use all names for columns (headings and their order)
|
||||
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
|
||||
@ -135,17 +102,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
|
||||
if self.columns and not self.attributes:
|
||||
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
|
||||
self.attributes = ["dn"]
|
||||
|
||||
if self.csv:
|
||||
if not self.columns:
|
||||
raise SystemExit("Table output requires attributes")
|
||||
if self.json:
|
||||
raise SystemExit("Can't use both --table / --csv / --sort and --json")
|
||||
if self.human:
|
||||
raise SystemExit("Can't use both --table / --csv / --sort and --human")
|
||||
|
||||
if self.raw:
|
||||
if self.csv:
|
||||
raise SystemExit("Table output requires decode; --raw not allowed")
|
||||
if self.json or self.human:
|
||||
raise SystemExit("Decode options require decode; --raw not allowed")
|
||||
|
Loading…
Reference in New Issue
Block a user