Compare commits
4 Commits
ldaptool-0
...
main
Author | SHA1 | Date | |
---|---|---|---|
e8a23e0ede | |||
125eea5afc | |||
a936734cee | |||
dd225c8b7a |
@ -9,7 +9,7 @@ CLI tool to query LDAP/AD servers
|
|||||||
* Integration with password managers
|
* Integration with password managers
|
||||||
* Various output formats
|
* Various output formats
|
||||||
* Classic LDIF
|
* Classic LDIF
|
||||||
* JSON stream (with detailed or simplified attribute values)
|
* JSON stream (with simplified or detailed attribute values)
|
||||||
* CSV
|
* CSV
|
||||||
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
|
* Markdown table with stretched columns (for viewing in CLI/for monospaces fonts); requires csvlook from [csvkit](https://csvkit.readthedocs.io/)
|
||||||
* HTML
|
* HTML
|
||||||
|
@ -55,13 +55,17 @@ class Arguments(search.Arguments):
|
|||||||
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
full_json: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(
|
||||||
|
help="Use full json output (dn as str, attributes as list of dicts containing various represenatations)",
|
||||||
|
),
|
||||||
|
)
|
||||||
json: bool = dataclasses.field(
|
json: bool = dataclasses.field(
|
||||||
default=False,
|
default=False,
|
||||||
metadata=argclasses.arg(help="Use full json output"),
|
metadata=argclasses.arg(
|
||||||
)
|
help="Use simple json output (dn as str, attributes map to list of human-readable strings)",
|
||||||
human: bool = dataclasses.field(
|
),
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
@ -85,15 +89,15 @@ class Arguments(search.Arguments):
|
|||||||
if self.table_output:
|
if self.table_output:
|
||||||
if not self.columns:
|
if not self.columns:
|
||||||
raise SystemExit("Table output requires attributes")
|
raise SystemExit("Table output requires attributes")
|
||||||
if self.json:
|
if self.full_json:
|
||||||
raise SystemExit("Can't use both table output and --json")
|
raise SystemExit("Can't use both table output and --json")
|
||||||
if self.human:
|
if self.json:
|
||||||
raise SystemExit("Can't use both table output and --human")
|
raise SystemExit("Can't use both table output and --human")
|
||||||
|
|
||||||
if self.raw:
|
if self.raw:
|
||||||
if self.table_output:
|
if self.table_output:
|
||||||
raise SystemExit("Table output requires decode; --raw not allowed")
|
raise SystemExit("Table output requires decode; --raw not allowed")
|
||||||
if self.json or self.human:
|
if self.full_json or self.json:
|
||||||
raise SystemExit("Decode options require decode; --raw not allowed")
|
raise SystemExit("Decode options require decode; --raw not allowed")
|
||||||
|
|
||||||
|
|
||||||
@ -183,7 +187,7 @@ class _Context:
|
|||||||
num_responses = 0
|
num_responses = 0
|
||||||
num_entries = 0
|
num_entries = 0
|
||||||
|
|
||||||
ldif_output = not (self.arguments.json or self.arguments.human)
|
ldif_output = not (self.arguments.full_json or self.arguments.json)
|
||||||
|
|
||||||
if ldif_output:
|
if ldif_output:
|
||||||
print("# extended LDIF")
|
print("# extended LDIF")
|
||||||
@ -214,11 +218,11 @@ class _Context:
|
|||||||
num_entries += 1
|
num_entries += 1
|
||||||
if ldif_output:
|
if ldif_output:
|
||||||
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
|
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
|
||||||
elif self.arguments.human:
|
elif self.arguments.json:
|
||||||
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
|
decoder.read_and_emit_simple_json(dn=dn, entry=entry, file=stream)
|
||||||
else:
|
else:
|
||||||
assert self.arguments.json
|
assert self.arguments.full_json
|
||||||
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
|
decoder.read_and_emit_full_json(dn=dn, entry=entry, file=stream)
|
||||||
except SizeLimitExceeded as e:
|
except SizeLimitExceeded as e:
|
||||||
raise SystemExit(f"Error: {e}")
|
raise SystemExit(f"Error: {e}")
|
||||||
|
|
||||||
|
@ -100,6 +100,13 @@ class Attribute:
|
|||||||
except Exception:
|
except Exception:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def _try_decode_grouptype(self) -> None:
|
||||||
|
if self.utf8_clean:
|
||||||
|
try:
|
||||||
|
self.decoded = _types.grouptype.parse(self.utf8_clean.strip())
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
|
||||||
def _try_decode(self, args: Arguments) -> None:
|
def _try_decode(self, args: Arguments) -> None:
|
||||||
if self.name in ("objectSid", "securityIdentifier"):
|
if self.name in ("objectSid", "securityIdentifier"):
|
||||||
self._try_decode_sid()
|
self._try_decode_sid()
|
||||||
@ -115,6 +122,8 @@ class Attribute:
|
|||||||
self._try_decode_timestamp(args)
|
self._try_decode_timestamp(args)
|
||||||
elif self.name == "userAccountControl":
|
elif self.name == "userAccountControl":
|
||||||
self._try_decode_uac()
|
self._try_decode_uac()
|
||||||
|
elif self.name == "groupType":
|
||||||
|
self._try_decode_grouptype()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _base64_value(self) -> str:
|
def _base64_value(self) -> str:
|
||||||
@ -195,27 +204,33 @@ class Decoder:
|
|||||||
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
def simple_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||||
emit = self.human(dn=dn, obj=obj)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
|
for name, attrs in obj.items():
|
||||||
|
emit[name] = [attr.human() for attr in attrs]
|
||||||
|
return emit
|
||||||
|
|
||||||
|
def emit_simple_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
|
emit = self.simple_json(dn=dn, obj=obj)
|
||||||
json.dump(emit, file, ensure_ascii=False)
|
json.dump(emit, file, ensure_ascii=False)
|
||||||
print(file=file) # terminate output dicts by newline
|
print(file=file) # terminate output dicts by newline
|
||||||
|
|
||||||
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
def read_and_emit_simple_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
self.emit_simple_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
|
|
||||||
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
def full_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
for name, attrs in obj.items():
|
for name, attrs in obj.items():
|
||||||
emit[name] = [attr.to_json() for attr in attrs]
|
emit[name] = [attr.to_json() for attr in attrs]
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
def emit_full_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
emit = self.json(dn=dn, obj=obj)
|
emit = self.full_json(dn=dn, obj=obj)
|
||||||
json.dump(emit, file, ensure_ascii=False)
|
json.dump(emit, file, ensure_ascii=False)
|
||||||
print(file=file) # terminate output dicts by newline
|
print(file=file) # terminate output dicts by newline
|
||||||
|
|
||||||
def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
def read_and_emit_full_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
self.emit_full_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
|
|
||||||
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
print(f"dn: {dn}", file=file)
|
print(f"dn: {dn}", file=file)
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from . import sid, timestamp, uac
|
from . import grouptype, sid, timestamp, uac
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
"grouptype",
|
||||||
"sid",
|
"sid",
|
||||||
"timestamp",
|
"timestamp",
|
||||||
"uac",
|
"uac",
|
||||||
|
29
src/ldaptool/decode/_types/grouptype.py
Normal file
29
src/ldaptool/decode/_types/grouptype.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import enum
|
||||||
|
import typing
|
||||||
|
|
||||||
|
|
||||||
|
class GroupTypeFlags(enum.IntFlag):
|
||||||
|
SYSTEM = 0x00000001
|
||||||
|
SCOPE_GLOBAL = 0x00000002
|
||||||
|
SCOPE_DOMAIN = 0x00000004
|
||||||
|
SCOPE_UNIVERSAL = 0x00000008
|
||||||
|
APP_BASIC = 0x00000010
|
||||||
|
APP_QUERY = 0x00000020
|
||||||
|
SECURITY = 0x80000000 # otherwise distribution
|
||||||
|
|
||||||
|
def flags(self) -> list[GroupTypeFlags]:
|
||||||
|
# ignore "uncovered" bits for now
|
||||||
|
value = self.value
|
||||||
|
members = []
|
||||||
|
for member in GroupTypeFlags:
|
||||||
|
member_value = member.value
|
||||||
|
if member_value and member_value & value == member_value:
|
||||||
|
members.append(member)
|
||||||
|
return members
|
||||||
|
|
||||||
|
|
||||||
|
def parse(value: str) -> str:
|
||||||
|
members = GroupTypeFlags(int(value)).flags()
|
||||||
|
return ", ".join(typing.cast(str, member.name) for member in members)
|
Reference in New Issue
Block a user