ldaptool/src/ldaptool/decode/_decoder.py

242 lines
7.9 KiB
Python

from __future__ import annotations
import base64
import dataclasses
import json
import re
import sys
import typing
import uuid
from . import _types
from .arguments import Arguments
TEntry = dict[str, list[bytes]]
TDecoded = dict[str, list["Attribute"]]
CTRL = re.compile(r"[\x00-\x19]")
@dataclasses.dataclass(slots=True, kw_only=True)
class Attribute:
name: str
raw: bytes
utf8_clean: typing.Optional[str]
decoded: typing.Optional[str]
@typing.overload
def __init__(
self,
*,
name: str,
raw: bytes,
arguments: Arguments,
) -> None:
...
@typing.overload
def __init__(
self,
*,
name: str,
raw: bytes,
_utf8_clean: str,
) -> None:
...
def __init__(
self,
*,
name: str,
raw: bytes,
arguments: typing.Optional[Arguments] = None,
_utf8_clean: typing.Optional[str] = None,
) -> None:
self.name = name
self.raw = raw
self.utf8_clean = None
self.decoded = None
if not _utf8_clean is None:
# building fake attribute; no decoding
self.utf8_clean = _utf8_clean
return
assert arguments, "Need arguments for proper decoding"
try:
utf8_clean = raw.decode()
if not CTRL.search(utf8_clean):
self.utf8_clean = utf8_clean
except Exception:
# UTF-8 decode error
pass
self._try_decode(arguments)
def _try_decode_sid(self) -> None:
try:
self.decoded = _types.sid.parse_raw(self.raw)
except Exception:
return
def _try_decode_uuid(self) -> None:
try:
self.decoded = str(uuid.UUID(bytes=self.raw))
except Exception:
return
def _try_decode_timestamp(self, args: Arguments) -> None:
if self.utf8_clean:
try:
date = _types.timestamp.parse(self.utf8_clean)
except Exception:
return
if args.dateonly:
self.decoded = str(date.date())
else:
self.decoded = str(date)
def _try_decode_uac(self) -> None:
if self.utf8_clean:
try:
self.decoded = _types.uac.parse(self.utf8_clean.strip())
except Exception:
return
def _try_decode(self, args: Arguments) -> None:
if self.name in ("objectSid","securityIdentifier"):
self._try_decode_sid()
elif self.name in ("msExchMailboxGuid", "objectGUID"):
self._try_decode_uuid()
elif self.name in (
"pwdLastSet",
"lastLogon", # DC local attribute, not synced
"lastLogonTimestamp", # set and synced across DCs if "more fresh" than msDS-LogonTimeSyncInterval
"badPasswordTime",
"accountExpires",
):
self._try_decode_timestamp(args)
elif self.name == "userAccountControl":
self._try_decode_uac()
@property
def _base64_value(self) -> str:
return base64.b64encode(self.raw).decode("ascii")
def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
if not self.decoded is None:
comment = self.utf8_clean
if comment is None:
comment = self._base64_value
print(f"{self.name}: {self.decoded} # {comment}", file=file)
elif not self.utf8_clean is None:
print(f"{self.name}: {self.utf8_clean}", file=file)
else:
print(f"{self.name}:: {self._base64_value}", file=file)
def to_json(self) -> dict[str, typing.Any]:
item: dict[str, typing.Any] = {}
b64_value = self._base64_value
item["binary"] = b64_value
if not self.utf8_clean is None:
item["ldif_value"] = self.utf8_clean
if not self.decoded is None:
item["human"] = self.decoded
elif not self.utf8_clean is None:
item["human"] = self.utf8_clean
else:
item["human"] = self._base64_value
item["human_is_base64"] = True
return item
def human(self) -> str:
if not self.decoded is None:
return self.decoded
elif not self.utf8_clean is None:
return self.utf8_clean
else:
return self._base64_value
@staticmethod
def fake_attribute(name: str, value: str) -> Attribute:
return Attribute(
name=name,
raw=value.encode(),
_utf8_clean=value,
)
@dataclasses.dataclass(slots=True, kw_only=True)
class Decoder:
arguments: Arguments
def read(self, *, dn: str, entry: TEntry) -> dict[str, list[Attribute]]:
# lowercase attribute name in decoded dict. attribute itself still knows original for LDIF output.
decoded_entry = {
name.lower(): [Attribute(name=name, raw=raw, arguments=self.arguments) for raw in raw_values]
for name, raw_values in entry.items()
}
for attr, post_processes in self.arguments.post_process.items():
if attr == "dn":
values = [dn]
else:
attrs = decoded_entry.get(attr, None)
if attrs is None:
continue
values = [at.human() for at in attrs]
for column, post_process in post_processes.items():
decoded_entry[column] = [
Attribute.fake_attribute(column, post_process.process(value)) for value in values
]
return decoded_entry
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items():
emit[name] = [attr.human() for attr in attrs]
return emit
def emit_simple_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.human(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_simple_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_simple_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def full_json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
emit: dict[str, typing.Any] = dict(dn=dn)
for name, attrs in obj.items():
emit[name] = [attr.to_json() for attr in attrs]
return emit
def emit_full_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
emit = self.full_json(dn=dn, obj=obj)
json.dump(emit, file, ensure_ascii=False)
print(file=file) # terminate output dicts by newline
def read_and_emit_full_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_full_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
print(f"dn: {dn}", file=file)
attrs: typing.Optional[list[Attribute]]
if not self.arguments.attributes:
# show all attributes - use order from server
for attrs in obj.values():
for attr in attrs:
attr.print(file=file)
else:
# only selected columns; use given order
for column in self.arguments.columns_keys:
if column == "dn":
continue # already printed dn
attrs = obj.get(column, None)
if attrs is None:
continue
for attr in attrs:
attr.print(file=file)
print(file=file) # separate entries with newlines
def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)