move json output format handling to main tool from decoder
This commit is contained in:
parent
c03374d6df
commit
bc1eb65738
@ -42,19 +42,27 @@ class Arguments(search.Arguments):
|
|||||||
help="Markdown table output - requires list of attributes",
|
help="Markdown table output - requires list of attributes",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
table_output: typing.Optional[TableOutput] = None
|
|
||||||
html: bool = dataclasses.field(
|
html: bool = dataclasses.field(
|
||||||
default=False,
|
default=False,
|
||||||
metadata=argclasses.arg(
|
metadata=argclasses.arg(
|
||||||
help="HTML table output - requires list of attributes",
|
help="HTML table output - requires list of attributes",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
table_output: typing.Optional[TableOutput] = None
|
||||||
sort: bool = dataclasses.field(
|
sort: bool = dataclasses.field(
|
||||||
default=False,
|
default=False,
|
||||||
metadata=argclasses.arg(
|
metadata=argclasses.arg(
|
||||||
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
help="Sorted table output - defaults to markdown --table unless --csv is given",
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
json: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(help="Use full json output"),
|
||||||
|
)
|
||||||
|
human: bool = dataclasses.field(
|
||||||
|
default=False,
|
||||||
|
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
|
||||||
|
)
|
||||||
|
|
||||||
def __post_init__(self) -> None:
|
def __post_init__(self) -> None:
|
||||||
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
super(Arguments, self).__post_init__() # super() not working here, unclear why.
|
||||||
@ -141,7 +149,7 @@ class _Context:
|
|||||||
continue
|
continue
|
||||||
# normal entry
|
# normal entry
|
||||||
assert not isinstance(entry, list)
|
assert not isinstance(entry, list)
|
||||||
obj = decoder.human(dn=dn, entry=decoder.read(dn=dn, entry=entry))
|
obj = decoder.human(dn=dn, obj=decoder.read(dn=dn, entry=entry))
|
||||||
yield tuple(obj.get(key, "") for key in column_keys)
|
yield tuple(obj.get(key, "") for key in column_keys)
|
||||||
except SizeLimitExceeded as e:
|
except SizeLimitExceeded as e:
|
||||||
raise SystemExit(f"Error: {e}")
|
raise SystemExit(f"Error: {e}")
|
||||||
@ -201,8 +209,13 @@ class _Context:
|
|||||||
# normal entry
|
# normal entry
|
||||||
assert not isinstance(entry, list)
|
assert not isinstance(entry, list)
|
||||||
num_entries += 1
|
num_entries += 1
|
||||||
obj = decoder.read(dn=dn, entry=entry)
|
if ldif_output:
|
||||||
decoder.emit(dn=dn, entry=obj)
|
decoder.read_and_emit_ldif(dn=dn, entry=entry, file=stream)
|
||||||
|
elif self.arguments.human:
|
||||||
|
decoder.read_and_emit_human(dn=dn, entry=entry, file=stream)
|
||||||
|
else:
|
||||||
|
assert self.arguments.json
|
||||||
|
decoder.read_and_emit_json(dn=dn, entry=entry, file=stream)
|
||||||
except SizeLimitExceeded as e:
|
except SizeLimitExceeded as e:
|
||||||
raise SystemExit(f"Error: {e}")
|
raise SystemExit(f"Error: {e}")
|
||||||
|
|
||||||
|
@ -122,16 +122,16 @@ class Attribute:
|
|||||||
def _base64_value(self) -> str:
|
def _base64_value(self) -> str:
|
||||||
return base64.b64encode(self.raw).decode("ascii")
|
return base64.b64encode(self.raw).decode("ascii")
|
||||||
|
|
||||||
def print(self) -> None:
|
def print(self, *, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
if not self.decoded is None:
|
if not self.decoded is None:
|
||||||
comment = self.utf8_clean
|
comment = self.utf8_clean
|
||||||
if comment is None:
|
if comment is None:
|
||||||
comment = self._base64_value
|
comment = self._base64_value
|
||||||
print(f"{self.name}: {self.decoded} # {comment}")
|
print(f"{self.name}: {self.decoded} # {comment}", file=file)
|
||||||
elif not self.utf8_clean is None:
|
elif not self.utf8_clean is None:
|
||||||
print(f"{self.name}: {self.utf8_clean}")
|
print(f"{self.name}: {self.utf8_clean}", file=file)
|
||||||
else:
|
else:
|
||||||
print(f"{self.name}:: {self._base64_value}")
|
print(f"{self.name}:: {self._base64_value}", file=file)
|
||||||
|
|
||||||
def to_json(self) -> dict[str, typing.Any]:
|
def to_json(self) -> dict[str, typing.Any]:
|
||||||
item: dict[str, typing.Any] = {}
|
item: dict[str, typing.Any] = {}
|
||||||
@ -191,39 +191,40 @@ class Decoder:
|
|||||||
]
|
]
|
||||||
return decoded_entry
|
return decoded_entry
|
||||||
|
|
||||||
def human(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
|
def human(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
for name, attrs in entry.items():
|
for name, attrs in obj.items():
|
||||||
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
emit[name] = self.arguments.human_separator.join(attr.human() for attr in attrs)
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def json(self, *, dn: str, entry: TDecoded) -> dict[str, str]:
|
def emit_human(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
|
emit = self.human(dn=dn, obj=obj)
|
||||||
|
json.dump(emit, file, ensure_ascii=False)
|
||||||
|
print(file=file) # terminate output dicts by newline
|
||||||
|
|
||||||
|
def read_and_emit_human(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
|
self.emit_human(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
|
|
||||||
|
def json(self, *, dn: str, obj: TDecoded) -> dict[str, str]:
|
||||||
emit: dict[str, typing.Any] = dict(dn=dn)
|
emit: dict[str, typing.Any] = dict(dn=dn)
|
||||||
for name, attrs in entry.items():
|
for name, attrs in obj.items():
|
||||||
emit[name] = [attr.to_json() for attr in attrs]
|
emit[name] = [attr.to_json() for attr in attrs]
|
||||||
return emit
|
return emit
|
||||||
|
|
||||||
def _emit_json(self, *, dn: str, entry: TDecoded) -> None:
|
def emit_json(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
if self.arguments.human:
|
emit = self.json(dn=dn, obj=obj)
|
||||||
emit = self.human(dn=dn, entry=entry)
|
json.dump(emit, file, ensure_ascii=False)
|
||||||
else:
|
print(file=file) # terminate output dicts by newline
|
||||||
emit = self.json(dn=dn, entry=entry)
|
|
||||||
json.dump(emit, sys.stdout, ensure_ascii=False)
|
|
||||||
print() # terminate output dicts by newline
|
|
||||||
|
|
||||||
def _emit_ldif(self, *, dn: str, entry: TDecoded) -> None:
|
def read_and_emit_json(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
print(f"dn: {dn}")
|
self.emit_json(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
for attrs in entry.values():
|
|
||||||
|
def emit_ldif(self, *, dn: str, obj: TDecoded, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
|
print(f"dn: {dn}", file=file)
|
||||||
|
for attrs in obj.values():
|
||||||
for attr in attrs:
|
for attr in attrs:
|
||||||
attr.print()
|
attr.print(file=file)
|
||||||
print() # separate entries with newlines
|
print(file=file) # separate entries with newlines
|
||||||
|
|
||||||
def emit(self, *, dn: str, entry: TDecoded) -> None:
|
def read_and_emit_ldif(self, *, dn: str, entry: TEntry, file: typing.IO[str] = sys.stdout) -> None:
|
||||||
if self.arguments.human or self.arguments.json:
|
self.emit_ldif(dn=dn, obj=self.read(dn=dn, entry=entry), file=file)
|
||||||
self._emit_json(dn=dn, entry=entry)
|
|
||||||
else:
|
|
||||||
self._emit_ldif(dn=dn, entry=entry)
|
|
||||||
|
|
||||||
def handle(self, *, dn: str, entry: TEntry) -> None:
|
|
||||||
entry_attrs = self.read(dn=dn, entry=entry)
|
|
||||||
self.emit(dn=dn, entry=entry_attrs)
|
|
||||||
|
@ -23,14 +23,6 @@ class Arguments(argclasses.BaseArguments):
|
|||||||
columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
|
columns: list[str] = dataclasses.field(default_factory=list, metadata=argclasses.manual(_parser_add_attributes))
|
||||||
attributes: list[str] = dataclasses.field(default_factory=list)
|
attributes: list[str] = dataclasses.field(default_factory=list)
|
||||||
|
|
||||||
json: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="Use full json output"),
|
|
||||||
)
|
|
||||||
human: bool = dataclasses.field(
|
|
||||||
default=False,
|
|
||||||
metadata=argclasses.arg(help="Use simple json output (join multiple values of one attribute)"),
|
|
||||||
)
|
|
||||||
human_separator: str = dataclasses.field(
|
human_separator: str = dataclasses.field(
|
||||||
default=", ",
|
default=", ",
|
||||||
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
|
metadata=argclasses.arg(help="Separator to join multiple values of one attribute with (default: %(default)r)"),
|
||||||
|
Loading…
Reference in New Issue
Block a user