diff --git a/README.md b/README.md
index 4b24dbd..d1328ff 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,8 @@ CLI tool to query LDAP/AD servers
* Classic LDIF
* JSON stream (with detailed or simplified attribute values)
* CSV
- * Markdown table with stretched columns (for viewing in CLI/for monospaces fonts)
+ * Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
+ * HTML
* Decodes certain well-known attributes (UUIDs, Timestamps, SID, userAccountControl)
* Requires server to support [RFC 2696: Simple Paged Results](https://www.rfc-editor.org/rfc/rfc2696) for proper pagination
* By default the first 1000 entries are shown, and it errors if there are more results
diff --git a/src/ldaptool/_main.py b/src/ldaptool/_main.py
index 527ec37..60f081e 100644
--- a/src/ldaptool/_main.py
+++ b/src/ldaptool/_main.py
@@ -2,18 +2,97 @@ from __future__ import annotations
import argparse
import csv
+import dataclasses
+import enum
+import html
import subprocess
import sys
import typing
from ldaptool import decode, search
+from ldaptool._utils import argclasses
from ldaptool._utils.ldap import Result, SizeLimitExceeded
+class TableOutput(enum.StrEnum):
+ MARKDOWN = "markdown"
+ CSV = "csv"
+ HTML = "html"
+
+
+def _html_escape_line(columns: typing.Sequence[str], *, cell: str = "td") -> str:
+ cell_s = f"<{cell}>"
+ cell_e = f"{cell}>"
+ return "
" + ("".join(cell_s + html.escape(col) + cell_e for col in columns)) + "
\n"
+
+
+@dataclasses.dataclass(slots=True, kw_only=True)
+class Arguments(search.Arguments):
+ raw: bool = dataclasses.field(
+ default=False,
+ metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
+ )
+ csv: bool = dataclasses.field(
+ default=False,
+ metadata=argclasses.arg(help="CSV output - requires list of attributes"),
+ )
+ table: bool = dataclasses.field(
+ default=False,
+ metadata=argclasses.arg(
+ help="Markdown table output - requires list of attributes",
+ ),
+ )
+ table_output: typing.Optional[TableOutput] = None
+ html: bool = dataclasses.field(
+ default=False,
+ metadata=argclasses.arg(
+ help="HTML table output - requires list of attributes",
+ ),
+ )
+ sort: bool = dataclasses.field(
+ default=False,
+ metadata=argclasses.arg(
+ help="Sorted table output - defaults to markdown --table unless --csv is given",
+ ),
+ )
+
+ def __post_init__(self) -> None:
+ super(Arguments, self).__post_init__() # super() not working here, unclear why.
+
+ # pick at most one in csv, (markdown) table, html
+ if [self.csv, self.table, self.html].count(True) > 1:
+ raise SystemExit("Can't use more than one table output type")
+
+ if self.csv:
+ self.table_output = TableOutput.CSV
+ elif self.table:
+ self.table_output = TableOutput.MARKDOWN
+ elif self.html:
+ self.table_output = TableOutput.HTML
+
+ if self.sort and self.table_output is None:
+ # default to markdown table
+ self.table_output = TableOutput.MARKDOWN
+
+ if self.table_output:
+ if not self.columns:
+ raise SystemExit("Table output requires attributes")
+ if self.json:
+ raise SystemExit("Can't use both table output and --json")
+ if self.human:
+ raise SystemExit("Can't use both table output and --human")
+
+ if self.raw:
+ if self.table_output:
+ raise SystemExit("Table output requires decode; --raw not allowed")
+ if self.json or self.human:
+ raise SystemExit("Decode options require decode; --raw not allowed")
+
+
class _Context:
def __init__(self) -> None:
parser = argparse.ArgumentParser()
- arguments_p = search.Arguments.add_to_parser(parser)
+ arguments_p = Arguments.add_to_parser(parser)
args = parser.parse_args()
try:
self.config = search.Config.load()
@@ -40,24 +119,60 @@ class _Context:
output = proc.stdin
try:
- if self.arguments.table:
+ if self.arguments.table_output == TableOutput.MARKDOWN:
add_filter(["csvlook"])
- if self.arguments.sort:
- add_filter(["csvsort", "--blanks"])
- self._run_search(search_iterator, stream=output)
+ if self.arguments.table_output:
+ self._table_output(search_iterator, stream=output)
+ else:
+ self._ldif_or_json_output(search_iterator, stream=output)
finally:
if procs:
output.close()
for proc in reversed(procs):
proc.wait()
- def _run_search(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
+ def _to_table_lines(self, search_iterator: typing.Iterable[Result]) -> typing.Iterable[tuple[str, ...]]:
+ decoder = decode.Decoder(arguments=self.arguments)
+ # "human" (json) dicts contain data by lower case key:
+ column_keys = [col.lower() for col in self.arguments.columns]
+ try:
+ for dn, entry in search_iterator:
+ if dn is None:
+ continue
+ # normal entry
+ assert not isinstance(entry, list)
+ obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
+ yield tuple(obj.get(key, "") for key in column_keys)
+ except SizeLimitExceeded as e:
+ raise SystemExit(f"Error: {e}")
+
+ def _table_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
+ line_iterator = self._to_table_lines(search_iterator)
+ if self.arguments.sort:
+ line_iterator = sorted(line_iterator)
+
+ if self.arguments.table_output in [TableOutput.CSV, TableOutput.MARKDOWN]:
+ csv_out = csv.writer(stream, lineterminator="\n")
+ csv_out.writerow(self.arguments.columns)
+
+ for line in line_iterator:
+ csv_out.writerow(line)
+ else:
+ assert self.arguments.table_output == TableOutput.HTML
+
+ stream.write("\n")
+ stream.write(_html_escape_line(self.arguments.columns, cell="th"))
+ for line in line_iterator:
+ stream.write(_html_escape_line(line))
+ stream.write("
\n")
+
+ def _ldif_or_json_output(self, search_iterator: typing.Iterable[Result], *, stream: typing.IO[str]) -> None:
decoder = decode.Decoder(arguments=self.arguments)
num_responses = 0
num_entries = 0
- ldif_output = not (self.arguments.csv or self.arguments.json or self.arguments.human)
+ ldif_output = not (self.arguments.json or self.arguments.human)
if ldif_output:
print("# extended LDIF")
@@ -72,22 +187,11 @@ class _Context:
print("#")
print()
- if self.arguments.csv:
- csv_out = csv.DictWriter(
- stream,
- fieldnames=self.arguments.columns,
- lineterminator="\n",
- extrasaction="ignore",
- )
- csv_out.writeheader()
- # dicts contain data by lower case key
- csv_out.fieldnames = [col.lower() for col in self.arguments.columns]
-
try:
for dn, entry in search_iterator:
num_responses += 1
if dn is None:
- if not self.arguments.csv:
+ if ldif_output:
print("# search reference")
for ref in entry:
assert isinstance(ref, str)
@@ -98,10 +202,7 @@ class _Context:
assert not isinstance(entry, list)
num_entries += 1
obj = decoder.read(dn=dn, entry=entry)
- if self.arguments.csv:
- csv_out.writerow(decoder.human(dn=dn, entry=obj))
- else:
- decoder.emit(dn=dn, entry=obj)
+ decoder.emit(dn=dn, entry=obj)
except SizeLimitExceeded as e:
raise SystemExit(f"Error: {e}")
diff --git a/src/ldaptool/search/arguments.py b/src/ldaptool/search/arguments.py
index 3445cca..1eef170 100644
--- a/src/ldaptool/search/arguments.py
+++ b/src/ldaptool/search/arguments.py
@@ -44,10 +44,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
default=False,
metadata=argclasses.arg(help="Query global catalogue (and forest root as search base)"),
)
- raw: bool = dataclasses.field(
- default=False,
- metadata=argclasses.arg(help="Don't pipe output through ldap-decode"),
- )
realm: str = dataclasses.field(metadata=argclasses.arg(required=True, help="Realm to search in"))
server: typing.Optional[str] = dataclasses.field(
default=None,
@@ -77,22 +73,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
help="Explicit search base (defaults to root of domain / forest with --gc)",
),
)
- csv: bool = dataclasses.field(
- default=False,
- metadata=argclasses.arg(help="CSV output - requires list of attributes"),
- )
- table: bool = dataclasses.field(
- default=False,
- metadata=argclasses.arg(
- help="Markdown table output - requires list of attributes",
- ),
- )
- sort: bool = dataclasses.field(
- default=False,
- metadata=argclasses.arg(
- help="Sorted table output - defaults to markdown --table unless --csv is given",
- ),
- )
def __post_init__(self) -> None:
if not self.filter is None:
@@ -107,19 +87,6 @@ class Arguments(ldaptool.decode.arguments.Arguments):
# probably doesn't like empty filter?
self.filter = "(objectClass=*)"
- # can't print both csv and markdown
- if self.csv and self.table:
- raise SystemExit("Can't use both --table and --csv")
-
- if self.sort:
- if not self.table and not self.csv:
- # default to markdown table
- self.table = True
-
- if self.table:
- # markdown requires underlying csv
- self.csv = True
-
# extract special attribute names
self.columns = self.attributes # use all names for columns (headings and their order)
attributes_set: dict[str, str] = {arg.lower(): arg for arg in self.attributes} # index by lowercase name
@@ -135,17 +102,3 @@ class Arguments(ldaptool.decode.arguments.Arguments):
if self.columns and not self.attributes:
# if we only wanted fake attributes, make sure we only request 'dn' - empty list would query all attributes
self.attributes = ["dn"]
-
- if self.csv:
- if not self.columns:
- raise SystemExit("Table output requires attributes")
- if self.json:
- raise SystemExit("Can't use both --table / --csv / --sort and --json")
- if self.human:
- raise SystemExit("Can't use both --table / --csv / --sort and --human")
-
- if self.raw:
- if self.csv:
- raise SystemExit("Table output requires decode; --raw not allowed")
- if self.json or self.human:
- raise SystemExit("Decode options require decode; --raw not allowed")