From 3b5f698ff5218917a98e9a63340a86b946b3387c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Wed, 10 May 2023 19:52:44 +0200 Subject: [PATCH] add argument to postprocess steps and support index/slicing in DN-related hooks; document them --- README.md | 28 ++++++ src/ldaptool/_utils/dninfo.py | 19 ++-- src/ldaptool/decode/_postprocess.py | 139 ++++++++++++++++++++++++---- src/ldaptool/decode/arguments.py | 16 ++-- 4 files changed, 168 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index d1328ff..363d553 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,34 @@ CLI tool to query LDAP/AD servers * By default the first 1000 entries are shown, and it errors if there are more results * Use `--all` to show all results +## Virtual attributes + +`ldaptool` supports constructing new values from existing attributes by adding a `:` suffix (which can be chained apart from the length limit). + +* Some suffixes support an argument as `:[]`. +* A single integer as postprocess suffix limits the length of the value; it replaces the last character of the output with `…` if it cut something off. +* Multi-valued attributes generate multiple virtual attrites; each value is processed individually. (The values are joined afterwards for table output if needed.) + +### DN handling + +DNs are decoded into lists of lists of `(name, value)` pairs (the inner list usually contains exactly one entry). +Attributes with a `DC` name are considered part of the "domain", everything else belongs to the "path". +(Usually a DN will start with path segments and end with domain segments.) +The path is read from back to front. + +The following postprocess hooks are available: +* `domain`: extracts the domain as DNS FQDN (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com`) +* `path`: extracts the non-domain parts without names and separates them by `/` (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `Dep1/Someone`) +* `fullpath`: uses the `domain` as first segment in a path (`CN=Someone,OU=Dep1,DC=example,DC=com` becomes `example.com/Dep1/Someone`) +* `dnslice`: extracts a "slice" from a DN (outer list only); the result is still in DN format. + +`path`, `fullpath` and `dnslice` take an optional index/slice as argument, written in python syntax. +For `path` and `fullpath` this extracts only the given index/slice from the path (`fullpath` always includes the full FQDN as first segment), `dnslice` operates on the outer list of decoded (lists of) pairs: + +* `dn:dnslice[1:]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `OU=Dep1,DC=example,DC=com` +* `dn:fullpath[:-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `example.com/Dep1` +* `dn:path[-1]` on `dn: CN=Someone,OU=Dep1,DC=example,DC=com` returns `Someone` + ## Authentication, Protocol, Ports `ldaptool` always uses TLS for password based authentication, and SASL GSS-API over non-TLS for Kerberos ones. diff --git a/src/ldaptool/_utils/dninfo.py b/src/ldaptool/_utils/dninfo.py index ba429a2..9de14c9 100644 --- a/src/ldaptool/_utils/dninfo.py +++ b/src/ldaptool/_utils/dninfo.py @@ -33,19 +33,26 @@ class DNInfo: def domain(self) -> str: return ".".join(ava[1] for rdn in self.parts for ava in rdn if ava[0].lower() == "dc") - def _path(self, *, escape: typing.Callable[[str], str], sep: str) -> str: - return sep.join(escape(ava[1]) for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc") + def _path(self, *, escape: typing.Callable[[str], str], sep: str, selection: slice = slice(None)) -> str: + rev_flattened = [ava[1] for rdn in reversed(self.parts) for ava in rdn if ava[0].lower() != "dc"] + return sep.join(value for value in rev_flattened[selection]) + + def sliced_path(self, selection: slice, /) -> str: + return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/", selection=selection) @functools.cached_property def path(self) -> str: - return self._path(escape=lambda value: _escape_backslash(value, special="/"), sep="/") + return self.sliced_path(slice(None)) - @property - def full_path(self) -> str: + def sliced_full_path(self, selection: slice, /) -> str: domain = self.domain - path = self.path + path = self.sliced_path(selection) if not path: return self.domain if not domain: return self.path return f"{domain}/{path}" + + @property + def full_path(self) -> str: + return self.sliced_full_path(slice(None)) diff --git a/src/ldaptool/decode/_postprocess.py b/src/ldaptool/decode/_postprocess.py index d7529e6..f7c213f 100644 --- a/src/ldaptool/decode/_postprocess.py +++ b/src/ldaptool/decode/_postprocess.py @@ -2,6 +2,9 @@ from __future__ import annotations import abc import dataclasses +import typing + +import ldap.dn from ldaptool._utils.dninfo import DNInfo @@ -14,6 +17,27 @@ class Step(abc.ABC): ... +def _args_to_slice(args: str) -> slice: + args = args.strip() + if not args: + return slice(None) + params: list[typing.Optional[int]] = [] + for arg in args.split(":"): + arg = arg.strip() + if arg: + params.append(int(arg)) + else: + params.append(None) + if len(params) == 1: + assert isinstance(params[0], int) + ndx = params[0] + if ndx == -1: + return slice(ndx, None) # from last element to end - still exactly one element + # this doesn't work for ndx == -1: slice(-1, 0) is always empty. otherwise it should return [ndx:][:1]. + return slice(ndx, ndx + 1) + return slice(*params) + + @dataclasses.dataclass(slots=True) class MaxLength(Step): limit: int @@ -26,6 +50,10 @@ class MaxLength(Step): @dataclasses.dataclass(slots=True) class DNDomain(Step): + def __init__(self, args: str) -> None: + if args: + raise ValueError(":domain doesn't support an argument") + def step(self, value: str) -> str: try: dninfo = DNInfo(dn=value) @@ -37,30 +65,57 @@ class DNDomain(Step): @dataclasses.dataclass(slots=True) class DNPath(Step): + path_slice: slice + + def __init__(self, args: str) -> None: + self.path_slice = _args_to_slice(args) + def step(self, value: str) -> str: try: dninfo = DNInfo(dn=value) except Exception: # not a valid DN -> no processing return value - return dninfo.path + return dninfo.sliced_path(self.path_slice) @dataclasses.dataclass(slots=True) class DNFullPath(Step): + path_slice: slice + + def __init__(self, args: str) -> None: + self.path_slice = _args_to_slice(args) + def step(self, value: str) -> str: try: dninfo = DNInfo(dn=value) except Exception: # not a valid DN -> no processing return value - return dninfo.full_path + return dninfo.sliced_full_path(self.path_slice) -_STEPS = { - "domain": DNDomain(), - "path": DNPath(), - "fullpath": DNFullPath(), +@dataclasses.dataclass(slots=True) +class DNSlice(Step): + slice: slice + + def __init__(self, args: str) -> None: + self.slice = _args_to_slice(args) + + def step(self, value: str) -> str: + try: + dninfo = DNInfo(dn=value) + except Exception: + # not a valid DN -> no processing + return value + return ldap.dn.dn2str(dninfo.parts[self.slice]) # type: ignore + + +_STEPS: dict[str, typing.Callable[[str], Step]] = { + "domain": DNDomain, + "path": DNPath, + "fullpath": DNFullPath, + "dnslice": DNSlice, } @@ -78,19 +133,63 @@ class PostProcess: return value -def parse_steps(steps: list[str]) -> PostProcess: - max_len = 0 - try: - max_len = int(steps[-1]) - steps.pop() - except ValueError: - pass - result = [] - for step in steps: - step_i = _STEPS.get(step, None) +def parse_steps(steps: str) -> PostProcess: + result: list[Step] = [] + + cur_id_start = 0 + cur_args_start = -1 + current_id = "" + current_args = "" + count_brackets = 0 + step_done = False + + def handle_step() -> None: + nonlocal cur_id_start, cur_args_start, current_id, current_args, step_done + assert step_done + + step_i = _STEPS.get(current_id, None) if step_i is None: - raise InvalidStep(f"Unknown post-processing step {step!r}") - result.append(step_i) - if max_len: - result.append(MaxLength(max_len)) + try: + max_len = int(current_id) + result.append(MaxLength(max_len)) + except ValueError: + raise InvalidStep(f"Unknown post-processing step {current_id!r}") + else: + result.append(step_i(current_args)) + + cur_id_start = pos + 1 + cur_args_start = -1 + current_id = "" + current_args = "" + step_done = False + + for pos, char in enumerate(steps): + if step_done: + if char != ":": + raise InvalidStep(f"Require : after step, found {char!r} at pos {pos}") + handle_step() + elif char == "[": + if count_brackets == 0: + # end of identifier + current_id = steps[cur_id_start:pos] + cur_args_start = pos + 1 + count_brackets += 1 + elif char == "]": + count_brackets -= 1 + if count_brackets == 0: + current_args = steps[cur_args_start:pos] + step_done = True + elif count_brackets: + continue + elif not char.isalnum(): + raise InvalidStep(f"Expecting either alphanumeric, ':' or '[', got {char!r} at {pos}") + + if not step_done: + current_id = steps[cur_id_start:] + if current_id: + step_done = True + + if step_done: + handle_step() + return PostProcess(result) diff --git a/src/ldaptool/decode/arguments.py b/src/ldaptool/decode/arguments.py index 90e324e..eff83b0 100644 --- a/src/ldaptool/decode/arguments.py +++ b/src/ldaptool/decode/arguments.py @@ -57,20 +57,20 @@ class Arguments(argclasses.BaseArguments): self.columns_keys.append(column) if column == "dndomain": - self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["domain"]) + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("domain") attributes_set.add("dn") elif column == "dnpath": - self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["path"]) + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("path") attributes_set.add("dn") elif column == "dnfullpath": - self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps(["fullpath"]) + self.post_process.setdefault("dn", {})[column] = _postprocess.parse_steps("fullpath") attributes_set.add("dn") else: - step_names = column.split(":") - attributes_set.add(step_names[0]) - if len(step_names) > 1: - source = step_names.pop(0) - self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(step_names) + col_parts = column.split(":", maxsplit=1) + attributes_set.add(col_parts[0]) + if len(col_parts) == 2: + source, steps = col_parts + self.post_process.setdefault(source, {})[column] = _postprocess.parse_steps(steps) if all_attributes: self.attributes = []