From 54b6d4d0b5e07f628682b2a88172fd00ffeaa396 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Thu, 12 Jan 2023 10:09:18 +0100 Subject: [PATCH] initial public commit --- .flake8 | 9 + .gitignore | 1 + LICENSE | 19 + README.md | 124 ++++ check-lints.sh | 24 + pqm | 1762 ++++++++++++++++++++++++++++++++++++++++++++++ pqm.example.yaml | 14 + 7 files changed, 1953 insertions(+) create mode 100644 .flake8 create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100755 check-lints.sh create mode 100755 pqm create mode 100644 pqm.example.yaml diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..969aa8d --- /dev/null +++ b/.flake8 @@ -0,0 +1,9 @@ +[flake8] +# E266 too many leading '#' for block comment [ I like marking disabled code blocks with '### ' ] +# E402 module level import not at top of file [ usually on purpose. might use individual overrides instead? ] +# E701 multiple statements on one line [ still quite readable in short forms ] +# E713 test for membership should be ‘not in’ [ disagree: want `not a in x` ] +# E714 test for object identity should be 'is not' [ disagree: want `not a is x` ] +# W503 line break before binary operator [ gotta pick one way ] +extend-ignore = E266,E402,E701,E713,E714,W503 +max-line-length = 140 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5ceb386 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +venv diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7fd29c8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright © Universität Stuttgart + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..0dc6a74 --- /dev/null +++ b/README.md @@ -0,0 +1,124 @@ +# Postfix (cluster) queue manager + +CLI tool to manage postfix queues (optionally across multiple hosts through ssh). + +It uses [trio](https://trio.readthedocs.io/en/stable/) for asynchronous operations (i.e. parallel handling of many nodes in a cluster). + +It also has proper readline support with history of commands. + +## Workflow + +Start with `health` to get a feeling for the queue state. + +Show the list of deferred mails with `list`, or show all mails from a certain user (maybe `health` showed a suspicious user) with `list from bob@example.com`. + +Note that `list` (contrary to `list-verbose`) only shows the last (remaining) recipient of a mail (and how many targets are remaining). + +Build up filters to match only certain mails - e.g. start with `select from MAILER-DAEMON` to search for bounces, and then limit those to no-reply targets with `select to ~reply`. Use `list` or `list-verbose` to see which mails remain, and what the problem is. + +Use `show mailid` to show headers of suspicious emails, or `show -hb mailid` to show the body too if really necessary. + +Use `hold` to hold all currently selected mails (e.g. spam/...), and then delete them later with `delete`. Or use `delete mailid` to delete specific mails directly. + +Use `expire` to return mails to the sender (because you think the error from `list-verbose` is permanent, even if postfix doesn't know it yet). + +`delete` and `expire` operations prompt for confirmation after showing affected mails. + +## List of commands + +There is an interactive help command. + +``` +# help +clear Clear all filters +clear Clear filter with given index (zero based; first filter is at index 0) +current Show current filters +delete Delete selected mails from the hold queue +delete Delete mails with given IDs from the queues +delete-all Delete selected mails from all queues +exit Exit pqm (or press Ctrl-D on empty prompt) +expire Expire mails and flush them (return error to sender). +expire Expire mails (return error to sender) with given IDs and flush them. +expire-noflush Expire mails (return error to sender on next delivery attempt). +expire-noflush Expire mails (return error to sender on next delivery attempt) with given IDs. +expire-release Expire mails and flush them (return error to sender); release if mails are on hold. +expire-release Expire mails (return error to sender) with given IDs and flush them; release if mails are on hold. +flush Flush (deferred) selected mails in the queue: attempt to deliver them +flush Flush (deferred) mails with given IDs: attempt to deliver them +flush-all Flush the queue: attempt to deliver all queued mail. +health Show generic stats for queues and overly active address +help Show all available commands +help Show detailed help for a command +hold Put selected mails on hold +hold Put mails with given IDs on hold +list [] List all mails (single line per mail); if no filter is configured hide active and hold queue +list-all [] List all mails (including active + hold queue, single line per mail) +list-verbose [] List all mails (verbose output); if no filter is configured hide active and hold queue +list-verbose-all [] List all mails (including active + hold queue, verbose output) +pop Remove last filter +release Release mail that was put "on hold". +release Release mails with given IDs that were put "on hold". +requeue Requeue mail (move to "maildrop"; get reprocessed) +requeue Requeue mail with given IDs (move to "maildrop"; get reprocessed) +select Add filter for mails that other commands work on +show usage: show [-b] [-e] [-h] ID [ID ...] +``` + +Available filter expressions are show in the help for `select`: + +``` +# help select +>>>> select + +Add filter for mails that other commands work on + +Filter syntax: +- `from bob@example.com`, `from alice@example.com,bob@example.com,@example.net`, `from "alice@example.com" "bob@example.com"` + Multiple address can be given to match any of them; either unquoted and comma separated, or quoted and whitespace + separated. In the unquoted case the (comma separated) pattern must not include any whitespace. + An address starting with `@` only checks the domain name and ignores the local part. +- `from ~regex.*@example.(com|net)`, `from ~"regex.*@example.(com|net)" "other@example.com` + To use regular expressions either put `~` in front of an unquoted pattern (need to repeat for each regex in a + comma separated pattern list) or before the quotes of a quoted pattern. +- `to ...` and `address ...` (matching both to and from) work the same as `from ...` +- `queue hold,deferred` + Comma separated list of queues a mail must be in. For single queue names `queue` can be omitted, e.g. `hold`. +- `$expr1 and $expr2` +- `$expr1 or $expr2` (`a and b or c` is parsed as `a and (b or c)`) +- `($expr)` +- `not $expr` +``` + +The select expressions use the data provided by `postqueue -j` - which is why it doesn't support filtering by subject/other headers. + +## Installation + +Dependencies: +- python3, probably >= 3.10 +- [trio](https://trio.readthedocs.io/en/stable/) +- [pyparsing](https://github.com/pyparsing/pyparsing/) +- [PyYAML](https://github.com/yaml/pyyaml) + +On Debian: `apt install python3-trio python3-pyparsing python3-yaml` + +Put the pqm script into your path and make it executable. + +## Configuration + +Configuration is optional; by default it tries to use `~/.config/pqm.yaml` and `/etc/pqm.yaml` (picking the first one that exists). + +Also see `pqm.example.yaml` in this repository. + +### Source (hosts load load mails from) + +By default it will use the local queue; it probably needs to run as root for that. + +Use `remote-sources` in the config to use remote hosts instead ("cluster" mode). Each remote source has a unique alias that is used in the `pqm` output (by default the first DNS label in the hostname). + +`pqm` uses `ssh -oBatchMode=yes root@remotesource ...` to execute postfix tools on the remote nodes; this means you'll probably want a public-key based authentication to the postfix nodes, and the server keys must already be trusted. + +You'll also want to make sure the ssh login is fast; `UseDNS no` (should be the default) on the server and ed25519 keys should help with that. + +## Queue IDs + +Each mail in the postfix queues has an ID; when using the local source `pqm` will use this ID directly. With remote sources it will prepend `alias/` to the ID. diff --git a/check-lints.sh b/check-lints.sh new file mode 100755 index 0000000..41b446a --- /dev/null +++ b/check-lints.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +set -e + +cd "$(dirname "$(readlink "$0")")" + +rc=0 + +run() { + # remember last failure + if "$@"; then :; else rc=$?; fi +} + +export PYTHONDONTWRITEBYTECODE=1 + +if [ ! -d venv ]; then + virtualenv -p python3 venv + ./venv/bin/pip install trio pyparsing PyYAML mypy flake8 trio-typing types-PyYAML +fi + +run ./venv/bin/flake8 pqm +run ./venv/bin/mypy pqm + +exit $rc diff --git a/pqm b/pqm new file mode 100755 index 0000000..5373f30 --- /dev/null +++ b/pqm @@ -0,0 +1,1762 @@ +#!/usr/bin/env python3 + +# Copyright © Universität Stuttgart +# LICENSE: MIT + +from __future__ import annotations + +import abc +import argparse +import atexit +import contextlib +import dataclasses +import datetime +import enum +import functools +import inspect +import json +import os +import os.path +import pathlib +import shlex +import pyparsing +import re +import readline +import socket +import subprocess +import sys +import traceback +import trio +import typing +import yaml + +T = typing.TypeVar('T') + + +@dataclasses.dataclass +class Config: + # if not remote_sources are configured -> use local queue + remote_sources: dict[str, str] = dataclasses.field(default_factory=dict) + + @staticmethod + def load(filename: str = '') -> Config: + if not filename: + default_paths = [ + os.path.expanduser('~/.config/pqm.yaml'), + '/etc/pqm.yaml', + ] + for path in default_paths: + if os.path.exists(path): + filename = path + break + else: + # no configfile -> default config + return Config() + with open(filename) as file: + data = yaml.safe_load(file) + assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}" + rs_list = data.get('remote-sources', []) + assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}" + remote_sources: dict[str, str] = {} + for entry in rs_list: + assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}" + if ':' in entry: + alias, target = entry.split(':', maxsplit=1) + else: + target = entry + alias = entry.split('.', maxsplit=1)[0] + assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}" + if alias in remote_sources: + raise ValueError(f'Duplicate alias {alias!r} in remote-sources (have: {remote_sources[alias]!r}, new: {target!r}') + remote_sources[alias] = target + return Config(remote_sources=remote_sources) + + def source(self) -> Source: + if self.remote_sources: + return RemoteSource(remotes=self.remote_sources) + else: + return LocalSource() + + +def input_ack(prompt: str) -> str: + # don't want readline for simple acks (and input always uses readline) + sys.stdout.write(prompt) + sys.stdout.flush() + return sys.stdin.readline(16) + + +def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.Callable[[pyparsing.ParseResults], T]: + # treat any exception thrown within a custom parser action as `ParseFatalException`, + # (stops parsing immediately) + @functools.wraps(act) + def wrapped(tokens: pyparsing.ParseResults): + try: + return act(tokens) + except Exception as e: + raise pyparsing.ParseFatalException(str(e)) + + return wrapped + + +class TrioParallelOrdered(typing.Generic[T]): + # used by trio_parallel_ordered below + + def __init__(self, nursery: trio.Nursery, result_handler: typing.Callable[[T], typing.Awaitable[None]]) -> None: + # basic idea: for each task create a oneshot channel (capacity 1) to deliver result; + # put receiving channel into self._tx (-> ordered by task start time) + # in _handle_results: get result channel (still ordered), then wait for result + self.nursery = nursery + self.result_handler = result_handler + self.limit = trio.CapacityLimiter(8) + send_channel: trio.MemorySendChannel[trio.MemoryReceiveChannel[T]] + receive_channel: trio.MemoryReceiveChannel[trio.MemoryReceiveChannel[T]] + (send_channel, receive_channel) = trio.open_memory_channel(128) + self._tx = send_channel + + async def _handle_results() -> None: + item: trio.MemoryReceiveChannel[T] + async for item in receive_channel: + try: + result = await item.receive() + except trio.EndOfChannel: + continue + await self.result_handler(result) + + self.nursery.start_soon(_handle_results) + + async def close(self) -> None: + await self._tx.aclose() + + async def _spawn(self, job, args, kwargs, *, task_status=trio.TASK_STATUS_IGNORED) -> None: + tx: trio.MemorySendChannel[T] + rx: trio.MemoryReceiveChannel[T] + (tx, rx) = trio.open_memory_channel(1) + await self._tx.send(rx) + async with self.limit: + task_status.started() + async with tx: + result = await job(*args, **kwargs) + await tx.send(result) + + async def start(self, job, *args, **kwargs) -> None: + await self.nursery.start(self._spawn, job, args, kwargs) + + +# async def handle_result_of_some_job_function(result) -> None: ... +# async def some_job_function(foo) -> ResultType: ... +# async with trio_parallel_ordered(handle_result_of_some_job_function) as tpo: +# for foo in biglist: +# await tpo.start(some_job_function, foo) +@contextlib.asynccontextmanager +async def trio_parallel_ordered( + result_handler: typing.Callable[[T], typing.Awaitable[None]], +) -> typing.AsyncGenerator[TrioParallelOrdered[T], None]: + async with trio.open_nursery() as nursery: + tpo = TrioParallelOrdered(nursery=nursery, result_handler=result_handler) + yield tpo + await tpo.close() + + +@dataclasses.dataclass +class Recipient: + """Recipient in a postfix mail""" + address: str + # "error message" from last delivery attempt + delay_reason: str + + @staticmethod + def from_json(d: dict) -> Recipient: + return Recipient( + address=d['address'], + delay_reason=d.get('delay_reason', ''), + ) + + +class QueueName(enum.Enum): + MAILDROP = "maildrop" + HOLD = "hold" + INCOMING = "incoming" + ACTIVE = "active" + DEFERRED = "deferred" + CORRUPT = "corrupt" + + +ALL_QUEUE_NAMES: set[QueueName] = set(QueueName) + + +@dataclasses.dataclass +class Mail: + """Metadata for mail in postfix queue""" + queue_name: QueueName + queue_id: str + arrival_time: datetime.datetime + message_size: int + sender: str + recipients: list[Recipient] + # forced_expire: bool + + @staticmethod + def from_json(d: dict) -> Mail: + return Mail( + queue_name=QueueName(d['queue_name']), + queue_id=d['queue_id'], + arrival_time=datetime.datetime.fromtimestamp(d['arrival_time']), + message_size=d['message_size'], + sender=d['sender'], + recipients=[Recipient.from_json(recp) for recp in d['recipients']], + ) + + @staticmethod + def read_postqueue_json(data: str, id_prefix: str = '') -> list[Mail]: + queue = [] + decoder = json.JSONDecoder() + data = data.strip() + while data: + obj, end = decoder.raw_decode(data, 0) + data = data[end:].lstrip() + mail = Mail.from_json(obj) + mail.queue_id = id_prefix + mail.queue_id + queue.append(mail) + return queue + + +# abstract collection/cluster of postfix nodes (or just a single one) +class Source(abc.ABC): + # list of server names in collection + @abc.abstractmethod + def server_list(self) -> list[str]: + raise NotImplementedError() + + # split "list" of input elements into chunks + @typing.final + @staticmethod + def chunks(data: typing.Iterable[T], *, chunk_size: int = 512) -> typing.Generator[list[T], None, None]: + if isinstance(data, list): + while data: + yield data[:chunk_size] + data = data[chunk_size:] + else: + chunk = [] + for item in data: + chunk.append(item) + if len(chunk) >= chunk_size: + yield chunk + chunk = [] + if chunk: + yield chunk + + # retrieve list of mails in all queues + @abc.abstractmethod + async def get_list(self) -> list[Mail]: + raise NotImplementedError() + + # basically postcat, and flags are used as command line arguments to postcast + # returns stdout and stderr of postcat + @abc.abstractmethod + async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]: + raise NotImplementedError() + + @abc.abstractmethod + async def flush_all(self) -> None: + """Flush the queue: attempt to deliver all queued mail.""" + # postqueue -f + raise NotImplementedError() + + @abc.abstractmethod + async def flush(self, queue_ids: typing.Iterable[str]) -> None: + """Schedule immediate delivery of deferred mail with the specified queue IDs.""" + # postqueue -i MSG1 -i MSG2 -i ... + raise NotImplementedError() + + # abstract way to call `postsuper`, action is the command line flag for the action to take + @abc.abstractmethod + async def _postsuper( + self, + action: str, + queue_ids: typing.Iterable[str], + *, + from_queues: set[QueueName] | tuple[()] = (), + ): + raise NotImplementedError() + + async def delete(self, queue_ids: typing.Iterable[str], *, from_queues: set[QueueName] | tuple[()] = ()) -> None: + """ + Delete one message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred). + """ + await self._postsuper('-d', queue_ids=queue_ids, from_queues=from_queues) + + async def hold(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None: + """ + Put mail "on hold" so that no attempt is made to deliver it. + Move one message with the named queue ID from the named mail queue(s) (default: incoming, active and deferred) to the hold queue. + """ + await self._postsuper('-h', queue_ids=queue_ids, from_queues=from_queues) + + async def release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None: + """ + Release mail that was put "on hold". + Move one message with the named queue ID from the named mail queue(s) (default: hold) to the deferred queue. + """ + await self._postsuper('-H', queue_ids=queue_ids, from_queues=from_queues) + + async def requeue(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None: + """ + Requeue the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred). + """ + await self._postsuper('-r', queue_ids=queue_ids, from_queues=from_queues) + + async def expire(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None: + """ + Expire the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred). + """ + await self._postsuper('-e', queue_ids=queue_ids, from_queues=from_queues) + + async def expire_release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None: + """ + Expire and release (if in hold) the message with the named queue ID from the named mail queue(s) + (default queues: hold, incoming, active and deferred). + """ + await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues) + + +class LocalSource(Source): + def server_list(self) -> list[str]: + return [socket.gethostname()] + + async def get_list(self) -> list[Mail]: + res: subprocess.CompletedProcess = await trio.run_process( + ['/usr/sbin/postqueue', '-j'], + capture_stdout=True, + ) + queue = Mail.read_postqueue_json(res.stdout.decode('utf-8')) + queue = sorted(queue, key=lambda item: item.arrival_time) # sort by incoming date, newest last + return queue + + async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]: + """stdout and stderr of postcat; stdout is None if we got non-zero status""" + if not re.fullmatch(r'[A-Z0-9]+', queue_id): + return (None, f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/".encode()) + flags_s = ' '.join(flags) + res: subprocess.CompletedProcess = await trio.run_process( + ['/usr/sbin/postcat'] + list(flags_s) + ['-q', queue_id], + capture_stdout=True, + capture_stderr=True, + check=False, + ) + if res.returncode != 0: + # expect some error was printed to stderr + return (None, res.stderr) + return (res.stdout, res.stderr) + + async def flush_all(self) -> None: + await trio.run_process(['/usr/sbin/postqueue', '-f']) + + async def flush(self, queue_ids: typing.Iterable[str]) -> None: + # postqueue only deals with single messages, because the flush daemon + # only accepts a single (flush) request per connection... + for msg in queue_ids: + await trio.run_process(['/usr/sbin/postqueue', '-i', msg]) + + async def _postsuper( + self, + action: str, + queue_ids: typing.Iterable[str], + *, + from_queues: set[QueueName] | tuple[()] = (), + ): + cmd = ['/usr/sbin/postsuper', action, '-'] + if isinstance(from_queues, set): + from_queues = from_queues - set((QueueName.CORRUPT,)) # remove "invalid" queue names + if from_queues: + cmd += [qn.value for qn in from_queues] + else: + # empty set of queues allowed -> no queue allowed + return + data = ''.join(f'{msg}\n' for msg in queue_ids) + await trio.run_process(cmd, stdin=data.encode()) + + +class RemoteSource(Source): + def __init__(self, remotes: dict[str, str]) -> None: + super().__init__() + self.remotes = remotes + + def server_list(self) -> list[str]: + return list(self.remotes) + + # split "remote mail id" into alias for remote and id on remote system + @staticmethod + def decode_queue_id(queue_id: str, *, allow_all: bool = False) -> tuple[str, str]: + parts = queue_id.split('/', maxsplit=1) + if len(parts) < 2: + raise ValueError(f"Invalid mail id: {queue_id!r} - expected HOST/localid") + host, queue_id = parts + if not re.fullmatch(r'[A-Z0-9]+', queue_id): + raise ValueError(f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/") + if not allow_all and queue_id == 'ALL': + raise ValueError(f"Invalid queue id: {queue_id!r} - ALL not allowed") + return (host, queue_id) + + async def get_list(self) -> list[Mail]: + queue: list[Mail] = [] + + async def get_host(remote_id: str, remote_host: str) -> None: + remote_id_prefix = remote_id + '/' + res: subprocess.CompletedProcess = await trio.run_process( + ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -j'], + capture_stdout=True, + ) + queue.extend(Mail.read_postqueue_json(res.stdout.decode('utf-8'), id_prefix=remote_id_prefix)) + + async with trio.open_nursery() as nursery: + for remote_id, remote_host in self.remotes.items(): + nursery.start_soon(get_host, remote_id, remote_host) + + queue = sorted(queue, key=lambda item: item.arrival_time) # sort by incoming date, newest last + return queue + + async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]: + try: + host, queue_id = self.decode_queue_id(queue_id) + except ValueError as e: + return (None, str(e).encode()) + if not host in self.remotes: + return (None, f"Unknown remote host: {host!r}".encode()) + remote_host = self.remotes[host] + cmd = ['/usr/sbin/postcat'] + list(flags) + ['-q', queue_id] + res: subprocess.CompletedProcess = await trio.run_process( + ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)], + capture_stdout=True, + capture_stderr=True, + check=False, + ) + if res.returncode != 0: + # expect some error was printed to stderr + return (None, res.stderr) + return (res.stdout, res.stderr) + + async def flush_all(self) -> None: + async def flush_host(remote_host: str) -> None: + await trio.run_process( + ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -f'], + ) + + async with trio.open_nursery() as nursery: + for _, remote_host in self.remotes.items(): + nursery.start_soon(flush_host, remote_host) + + # group "long ids" (with alias/ prefix) by alias (remote host) + def aggregate_by_host(self, queue_ids: typing.Iterable[str], *, allow_all: bool) -> dict[str, list[str]]: + by_host: dict[str, list[str]] = {} + for long_queue_id in queue_ids: + if long_queue_id == 'ALL': + if not allow_all: + raise ValueError(f"Invalid queue id: {long_queue_id!r} - ALL not allowed") + for host in self.remotes: + by_host.setdefault(host, []).append('ALL') + continue + host, queue_id = self.decode_queue_id(long_queue_id, allow_all=allow_all) + if not host in by_host: + by_host[host] = [queue_id] + else: + by_host[host].append(queue_id) + for host in list(by_host): + if not host in self.remotes: + ids = by_host.pop(host) + raise ValueError(f"Unknown remote host {host!r}, used for {len(ids)} queue ids") + return by_host + + async def flush(self, queue_ids: typing.Iterable[str]) -> None: + by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=False) + + async def run_host(remote_host: str, ids: list[str]) -> None: + for run in self.chunks(ids): + # postqueue only deals with single messages, because the flush daemon + # only accepts a single (flush) request per connection... + cmd = 'for msg in ' + shlex.join(run) + '; do /usr/sbin/postqueue -i "${msg}"; done' + await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, cmd]) + + async with trio.open_nursery() as nursery: + for host, ids in by_host.items(): + remote_host = self.remotes[host] + nursery.start_soon(run_host, remote_host, ids) + + async def _postsuper( + self, + action: str, + queue_ids: typing.Iterable[str], + *, + from_queues: set[QueueName] | tuple[()] = (), + ) -> None: + cmd = ['/usr/sbin/postsuper', action, '-'] + if isinstance(from_queues, set): + from_queues = from_queues - set((QueueName.CORRUPT,)) # remove "invalid" queue names + if from_queues: + cmd += [qn.value for qn in from_queues] + else: + # empty set of queues allowed -> no queue allowed + return + by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=True) + + async def run_host(remote_host: str, data: str) -> None: + await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)], stdin=data.encode()) + + async with trio.open_nursery() as nursery: + for host, ids in by_host.items(): + remote_host = self.remotes[host] + data = ''.join(f'{msg}\n' for msg in ids) + nursery.start_soon(run_host, remote_host, data) + + +class UserActivityStats: + # find most active address for `health` command + + def __init__(self, cut_off: int = 10) -> None: + # count recipients individually for each sender, and recipients just once + self.count_mails: dict[str, int] = {} + self.related_mails: dict[str, list[Mail]] = {} + self.cut_off = cut_off + + def count(self, item: Mail) -> None: + if QueueName.HOLD == item.queue_name: + return + sender = item.sender.lower() + if item.sender != 'MAILER-DAEMON': + if not sender in self.count_mails: + self.count_mails[sender] = 0 + self.related_mails[sender] = [] + self.count_mails[sender] += len(item.recipients) + self.related_mails[sender].append(item) + recipients = { + recp.address.lower() + for recp in item.recipients + } + for recp in recipients: + if sender == recp: continue # count user just once per mail + if not recp in self.count_mails: + self.count_mails[recp] = 0 + self.related_mails[recp] = [] + self.count_mails[recp] += 1 + self.related_mails[recp].append(item) + + def prepare(self) -> None: + for (user, count) in list(self.count_mails.items()): + if count < self.cut_off: + self.count_mails.pop(user) + + def extract_highest_user(self) -> None | tuple[str, int, list[Mail]]: + max_count = 0 + if not self.count_mails: + return None + user = '' + for (loop_user, count) in list(self.count_mails.items()): + if count > max_count: + user = loop_user + max_count = count + activity = self.count_mails.pop(user) + mails = self.related_mails.pop(user) + for item in mails: + sender = item.sender.lower() + if user != sender and sender in self.count_mails: + self.count_mails[sender] -= len(item.recipients) + if self.count_mails[sender] < self.cut_off: + self.count_mails.pop(sender) + recipients = { + recp.address.lower() + for recp in item.recipients + } + for recp in recipients: + if sender == recp: continue # count user just once per mail + if user != recp and recp in self.count_mails: + self.count_mails[recp] -= 1 + if self.count_mails[recp] < self.cut_off: + self.count_mails.pop(recp) + return (user, activity, mails) + + +class Filter(abc.ABC): + """abstract base class for filter expressions""" + + def __init__(self) -> None: + pass + + @abc.abstractmethod + def matches(self, mail: Mail) -> bool: + """Whether mail matches the filter""" + raise NotImplementedError() + + @abc.abstractmethod + def possible_queues(self) -> set[QueueName]: + """Which queues matched mails are possible from (all unless explicitly restricted)""" + raise NotImplementedError() + + @abc.abstractmethod + def __repr__(self) -> str: + raise NotImplementedError() + + @typing.final + def filter(self, mails: typing.Iterable[Mail]) -> typing.Generator[Mail, None, None]: + """Filter list of mails""" + for mail in mails: + if self.matches(mail): + yield mail + + @typing.final + def filter_ids(self, mails: typing.Iterable[Mail]) -> typing.Generator[str, None, None]: + """Filter list of mails but only return mail ids""" + for mail in mails: + if self.matches(mail): + yield mail.queue_id + + def __and__(self, other: typing.Any) -> Filter: + """Combine two filter into one that only matches mails that matched both""" + if not isinstance(other, Filter): + return NotImplemented + if other is TRUE_FILTER: + return self + if other is FALSE_FILTER: + return other + if isinstance(other, AndFilter): + return AndFilter([self] + other.expressions) + return AndFilter([self, other]) + + def __or__(self, other: typing.Any) -> Filter: + """Combine two filter into one that matches mails that matched at least one""" + if not isinstance(other, Filter): + return NotImplemented + if other is TRUE_FILTER: + return other + if other is FALSE_FILTER: + return self + if isinstance(other, OrFilter): + return OrFilter([self] + other.expressions) + return OrFilter([self, other]) + + def __invert__(self) -> Filter: + """Create filter that exactly matches mail that didn't match the original filter""" + return NotFilter(self) + + +# abstract base class for filters that don't need (...) around in representations of members in combined filters +class SingleExprFilter(Filter): + pass + + +class FalseFilter(SingleExprFilter): + """constant false - matches no mail""" + def __repr__(self) -> str: + return '0' + + def matches(self, mail: Mail) -> bool: + return False + + def possible_queues(self) -> set[QueueName]: + return set() + + def __and__(self, other: typing.Any) -> Filter: + if not isinstance(other, Filter): + return NotImplemented + return self + + def __or__(self, other: typing.Any) -> Filter: + if not isinstance(other, Filter): + return NotImplemented + return other + + def __invert__(self) -> Filter: + return TRUE_FILTER + + +class TrueFilter(SingleExprFilter): + """constant true - matches all mail""" + def __repr__(self) -> str: + return '1' + + def matches(self, mail: Mail) -> bool: + return True + + def possible_queues(self) -> set[QueueName]: + return ALL_QUEUE_NAMES + + def __and__(self, other: typing.Any) -> Filter: + if not isinstance(other, Filter): + return NotImplemented + return other + + def __or__(self, other: typing.Any) -> Filter: + if not isinstance(other, Filter): + return NotImplemented + return self + + def __invert__(self) -> Filter: + return FALSE_FILTER + + +FALSE_FILTER = FalseFilter() +TRUE_FILTER = TrueFilter() + + +class AndFilter(Filter): + def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None: + super().__init__() + self.expressions: list[Filter] = expressions or [] + + def matches(self, mail: Mail) -> bool: + for expr in self.expressions: + if not expr.matches(mail): + return False + return True + + def possible_queues(self) -> set[QueueName]: + res = set(ALL_QUEUE_NAMES) # clone set + for expr in self.expressions: + res &= expr.possible_queues() + return res + + def __repr__(self) -> str: + if len(self.expressions) == 0: + return '1' + if len(self.expressions) == 1: + return repr(self.expressions[0]) + subexprs = [] + for e in self.expressions: + if isinstance(e, (SingleExprFilter, AndFilter)): + subexprs.append(repr(e)) + else: + subexprs.append(f'({e!r})') + return ' and '.join(subexprs) + + def __and__(self, other: typing.Any) -> AndFilter: + if not isinstance(other, Filter): + return NotImplemented + if isinstance(other, AndFilter): + return AndFilter(self.expressions + other.expressions) + return AndFilter(self.expressions + [other]) + + @staticmethod + @parser_action + def from_parse_results(toks: pyparsing.ParseResults) -> Filter: + assert len(toks) == 1 + inner_toks = list(toks[0]) + assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}" + token = inner_toks.pop(0) + assert isinstance(token, Filter) + while inner_toks: + assert inner_toks.pop(0) == 'and', f"Missing 'and' in {toks}" + next_token = inner_toks.pop(0) + assert isinstance(next_token, Filter), f"Non-filter in {toks}" + token = token & next_token + return token + + +class OrFilter(Filter): + def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None: + super().__init__() + self.expressions: list[Filter] = expressions or [] + + def matches(self, mail: Mail) -> bool: + for expr in self.expressions: + if expr.matches(mail): + return True + return False + + def possible_queues(self) -> set[QueueName]: + res = set() + for expr in self.expressions: + res |= expr.possible_queues() + return res + + def __repr__(self) -> str: + if len(self.expressions) == 0: + return '0' + if len(self.expressions) == 1: + return repr(self.expressions[0]) + subexprs = [] + for e in self.expressions: + if isinstance(e, (SingleExprFilter, OrFilter)): + subexprs.append(repr(e)) + else: + subexprs.append(f'({e!r})') + return ' or '.join(subexprs) + + def __or__(self, other: typing.Any) -> OrFilter: + if not isinstance(other, Filter): + return NotImplemented + if isinstance(other, OrFilter): + return OrFilter(self.expressions + other.expressions) + return OrFilter(self.expressions + [other]) + + @staticmethod + @parser_action + def from_parse_results(toks: pyparsing.ParseResults) -> Filter: + assert len(toks) == 1 + inner_toks = list(toks[0]) + assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}" + token = inner_toks.pop(0) + assert isinstance(token, Filter) + while inner_toks: + assert inner_toks.pop(0) == 'or', f"Missing 'or' in {toks}" + next_token = inner_toks.pop(0) + assert isinstance(next_token, Filter), f"Non-filter in {toks}" + token = token | next_token + return token + + +class NotFilter(SingleExprFilter): + def __init__(self, expression: Filter) -> None: + super().__init__() + self.expression = expression + + def matches(self, mail: Mail) -> bool: + return not self.expression.matches(mail) + + def possible_queues(self) -> set[QueueName]: + # negation could match mails from any filter! + if isinstance(self.expression, QueueFilter): + # in this case we actually no the negation + return ALL_QUEUE_NAMES - self.expression.select + # otherwise: everything is possible + return ALL_QUEUE_NAMES + + def __repr__(self) -> str: + if isinstance(self.expression, SingleExprFilter): + return f'not {self.expression!r}' + else: + return f'not ({self.expression!r})' + + def __invert__(self) -> Filter: + return self.expression + + @staticmethod + @parser_action + def from_parse_results(toks: pyparsing.ParseResults) -> Filter: + assert len(toks) == 1 + toks = toks[0] + assert len(toks) == 2 + assert toks[0] == 'not' + assert isinstance(toks[1], Filter) + return ~toks[1] + + +class QueueFilter(SingleExprFilter): + """match mails based on queue they are in""" + def __init__(self, select: list[QueueName]) -> None: + self.select = set(select) + + def matches(self, mail: Mail) -> bool: + return mail.queue_name in self.select + + def possible_queues(self) -> set[QueueName]: + return self.select + + def __repr__(self) -> str: + if not self.select: + return '0' + return f'queue {",".join(qn.value for qn in self.select)}' + + def __and__(self, other: typing.Any) -> Filter: + if not isinstance(other, Filter): + return NotImplemented + if isinstance(other, QueueFilter): + select = self.select & other.select + if select: + return QueueFilter(list(select)) + else: + return FALSE_FILTER + return super().__and__(other) + + def __or__(self, other: typing.Any) -> Filter: + if not isinstance(other, Filter): + return NotImplemented + if isinstance(other, QueueFilter): + select = self.select | other.select + return QueueFilter(list(select)) + return super().__or__(other) + + # def __invert__(self) -> Filter: + # select = ALL_QUEUE_NAMES - self.select + # return QueueFilter(list(select)) + + @staticmethod + @parser_action + def from_parse_results(toks: pyparsing.ParseResults) -> QueueFilter: + return QueueFilter([QueueName(t) for t in toks]) + + +class AddressSelector(enum.Enum): + # addresses of a mail to extract + SENDER = "from" + RECIPIENT = "to" + ANY = "address" + + def extract(self, mail: Mail) -> typing.Generator[str, None, None]: + if self == AddressSelector.SENDER or self == AddressSelector.ANY: + yield mail.sender + if self == AddressSelector.RECIPIENT or self == AddressSelector.ANY: + for recp in mail.recipients: + yield recp.address + + +class BaseAddressPattern(abc.ABC): + # abstract base class of patterns to use to match a mail address + # subclasses match either: full address, domain part, regex + def __init__(self) -> None: + pass + + @abc.abstractmethod + def __repr__(self) -> str: + raise NotImplementedError() + + @abc.abstractmethod + def matches(self, address: str) -> bool: + raise NotImplementedError() + + @staticmethod + @parser_action + def from_unquoted_parse_results(toks: pyparsing.ParseResults) -> list[BaseAddressPattern]: + addresses: str = toks[0] + patterns: list[BaseAddressPattern] = [] + for address in addresses.strip().split(','): + address = address.strip() + if not address: + continue + if address.startswith('~'): + patterns.append(AddressRegexMatch(address[1:])) + elif address.startswith('@'): + patterns.append(AddressDomainPattern(address[1:])) + else: + patterns.append(AddressPattern(address)) + return patterns + + +class AddressPattern(BaseAddressPattern): + # match full address exactly + def __init__(self, address: str) -> None: + super().__init__() + self.address = address.lower().removesuffix('.') + + def matches(self, address: str) -> bool: + return address.lower().removesuffix('.') == self.address + + def __repr__(self) -> str: + return f'{self.address!r}' + + @staticmethod + @parser_action + def from_parse_results(toks: pyparsing.ParseResults) -> BaseAddressPattern: + address: str = toks[0] + if address.startswith('@'): + return AddressDomainPattern(address[1:]) + else: + return AddressPattern(address) + + +class AddressDomainPattern(BaseAddressPattern): + # match address by domain + def __init__(self, domain: str) -> None: + super().__init__() + self.domain = domain.lower().removesuffix('.') + + def matches(self, address: str) -> bool: + parts = address.rsplit('@', maxsplit=1) + if len(parts) == 1: + # "MAILER-DAEMON" + return False + return parts[1].lower().removesuffix('.') == self.domain + + def __repr__(self) -> str: + return f'{"@" + self.domain!r}' + + +class AddressRegexMatch(BaseAddressPattern): + # match address by regex + def __init__(self, address: str | re.Pattern[str]) -> None: + super().__init__() + if isinstance(address, re.Pattern): + self.address = address + else: + self.address = re.compile(address) + + def matches(self, address: str) -> bool: + return bool(self.address.search(address.lower().removesuffix('.'))) + + def __repr__(self) -> str: + return f'~{self.address.pattern!r}' + + @staticmethod + @parser_action + def from_parse_results(toks: pyparsing.ParseResults) -> AddressRegexMatch: + return AddressRegexMatch(toks[0]) + + +class AddressFilter(SingleExprFilter): + # match mails by address + def __init__(self, selector: AddressSelector, patterns: list[BaseAddressPattern]) -> None: + self.selector = selector + self.patterns = patterns + + def matches(self, mail: Mail) -> bool: + for address in self.selector.extract(mail): + for pattern in self.patterns: + if pattern.matches(address): + return True + return False + + def possible_queues(self) -> set[QueueName]: + return ALL_QUEUE_NAMES + + def __repr__(self) -> str: + if not self.patterns: + return '0' + return f'{self.selector.value} {" ".join(repr(p) for p in self.patterns)}' + + @staticmethod + @parser_action + def from_parse_results(toks: pyparsing.ParseResults) -> AddressFilter: + return AddressFilter( + selector=AddressSelector(toks[0]), + patterns=list(toks[1]), + ) + + +# create parser for filter expressions +def build_filter_parser() -> typing.Callable[[str], Filter]: + addr_expr_selector = pyparsing.MatchFirst(( + pyparsing.Keyword(sel.value) + for sel in AddressSelector + )) + squoted = pyparsing.QuotedString(quoteChar="'", escChar='\\', unquoteResults=True) + dquoted = pyparsing.QuotedString(quoteChar='"', escChar='\\', unquoteResults=True) + quoted = squoted | dquoted + addr_expr_pattern_q_re = (pyparsing.Suppress('~') + quoted).setParseAction(AddressRegexMatch.from_parse_results) + addr_expr_pattern_q = quoted.copy().setParseAction(AddressPattern.from_parse_results) + addr_expr_pattern = pyparsing.Word(pyparsing.printables, excludeChars="'\"") \ + .setParseAction(BaseAddressPattern.from_unquoted_parse_results) + addr_expr_patterns = pyparsing.MatchFirst([ + pyparsing.OneOrMore(addr_expr_pattern_q_re | addr_expr_pattern_q), + addr_expr_pattern, + ]) + addr_expr = ( + addr_expr_selector + pyparsing.Optional(pyparsing.Suppress('=')) + pyparsing.Group(addr_expr_patterns) + ).setParseAction(AddressFilter.from_parse_results) + queue_expr_name = pyparsing.MatchFirst([pyparsing.Keyword(qn.value) for qn in QueueName]) + queue_expr_single = queue_expr_name + queue_expr_mult = ( + pyparsing.Suppress(pyparsing.Keyword('queue')) + + queue_expr_name + + pyparsing.ZeroOrMore(pyparsing.Suppress(',') + queue_expr_name) + ) + queue_expr = (queue_expr_single | queue_expr_mult).setParseAction(QueueFilter.from_parse_results) + const_true = pyparsing.Keyword('1').setParseAction(lambda x: TRUE_FILTER) + const_false = pyparsing.Keyword('0').setParseAction(lambda x: FALSE_FILTER) + base_expr = addr_expr | queue_expr | const_true | const_false + parser = pyparsing.infixNotation(base_expr, [ + ('not', 1, pyparsing.opAssoc.RIGHT, NotFilter.from_parse_results), + ('or', 2, pyparsing.opAssoc.LEFT, OrFilter.from_parse_results), + ('and', 2, pyparsing.opAssoc.LEFT, AndFilter.from_parse_results), + ]) + + def parse(input: str) -> Filter: + return parser.parseString(input, parseAll=True)[0] + + return parse + + +FILTER_PARSER = build_filter_parser() + + +# TODO: proper type hinting for decorator builder? +# commands are methods in the CLI class and need to be decorated with this. +# the name defaults to the name of the function. +def register_command(*, name: str | tuple[()] = ()): + def register(cmd: typing.Callable) -> typing.Callable: + setattr(cmd, '__command', name) + return cmd + + return register + + +class CLI: + def __init__(self, source: Source) -> None: + self.source = source + self.current_filter: AndFilter = AndFilter() + self._simple_commands: dict[str, typing.Callable[[], typing.Awaitable[None]]] = {} + self._long_commands: dict[str, typing.Callable[[str], typing.Awaitable[None]]] = {} + for name in dir(self): + prop = getattr(self, name) + if callable(prop): + command = getattr(prop, '__command', None) + if command is None: continue + if command == (): + cmd_name = name + else: + assert isinstance(command, str) + cmd_name = command + sig = inspect.signature(prop) + simple = False + long = False + if len(sig.parameters) == 0: + simple = True + elif len(sig.parameters) == 1: + long = True + param = list(sig.parameters.values())[0] + if not param.default is inspect.Parameter.empty: + simple = True + else: + raise TypeError('Commands must take zero or one arguments') + if simple: + self._simple_commands[cmd_name] = prop + if long: + self._long_commands[cmd_name] = prop + self._known_commands = set(self._simple_commands) | set(self._long_commands) + self._init_readline() + self._completion_cache: list[str] = [] + + def _readline_completer(self, text: str, state: int) -> str | None: + text_lstripped = text.lstrip() + front_space = text[:len(text) - len(text_lstripped)] + text = text_lstripped + if len(text.split(maxsplit=1)) > 1: + # not in first word (command) anymore - no more completions + return None + if text.rstrip() != text: + # whitespace after first word (command) - no more completions + return None + if state == 0: + self._completion_cache = [ + front_space + cmd + for cmd in self._known_commands + if cmd.startswith(text) + ] + if state >= len(self._completion_cache): + return None + return self._completion_cache[state] + + def _init_readline(self) -> None: + readline.read_init_file() + # completer + readline.parse_and_bind("tab: complete") + readline.set_completer(self._readline_completer) + readline.set_completer_delims("") # want to see full buffer in completer + # history handling: + self._history_file = os.path.join(os.path.expanduser("~"), ".pqm_history") + try: + readline.read_history_file(self._history_file) + except FileNotFoundError: + pass + previous_history_len = readline.get_current_history_length() + readline.set_auto_history(True) + readline.set_history_length(100) + + def finish_readline() -> None: + new_entries = readline.get_current_history_length() - previous_history_len + if new_entries > 0: + try: + readline.append_history_file(new_entries, self._history_file) + except FileNotFoundError: + readline.write_history_file(self._history_file) + + atexit.register(finish_readline) + + @register_command() + async def help(self) -> None: + """Show all available commands""" + all_cmds: list[tuple[str, str]] = [] + for cmd, func1 in self._simple_commands.items(): + if self._long_commands.get(cmd, None) is func1: + all_cmds.append((f'{cmd} []', func1.__doc__ or 'No help available')) + else: + all_cmds.append((cmd, func1.__doc__ or 'No help available')) + for cmd, func2 in self._long_commands.items(): + if self._simple_commands.get(cmd, None) is func2: + continue + all_cmds.append((f'{cmd} ', func2.__doc__ or 'No help available')) + for cmd, doc in sorted(all_cmds): + doc = doc.strip().splitlines()[0] + print(f'{cmd:<30} {doc}', file=sys.stderr) + print(file=sys.stderr) + + @register_command(name="help") + async def help_command(self, args: str) -> None: + """Show detailed help for a command""" + command = args.strip() + if not command in self._known_commands: + print(f'Command {command} not known, no help available', file=sys.stderr) + return + simple_command = self._simple_commands.get(command, None) + long_command = self._long_commands.get(command, None) + if simple_command is long_command: + print(f'>>>> {command} [args]', file=sys.stderr) + print(file=sys.stderr) + doc = inspect.cleandoc(simple_command.__doc__ or 'No help available') + print(doc, file=sys.stderr) + print(file=sys.stderr) + return + if simple_command: + print(f'>>>> {command}', file=sys.stderr) + print(file=sys.stderr) + doc = inspect.cleandoc(simple_command.__doc__ or 'No help available') + print(doc, file=sys.stderr) + print(file=sys.stderr) + if long_command: + print(f'>>>> {command} ', file=sys.stderr) + print(file=sys.stderr) + doc = inspect.cleandoc(long_command.__doc__ or 'No help available') + print(doc, file=sys.stderr) + print(file=sys.stderr) + + @register_command() + async def exit(self) -> None: + """Exit pqm (or press Ctrl-D on empty prompt)""" + # leave main loop + raise KeyboardInterrupt() + + def prompt_if_empty_filter(self) -> bool: + if self.current_filter.expressions: + return True + print( + "You currently have no filters configured, which means this action will " + "potentially run for a lot of mails", + file=sys.stderr, + ) + ack = input_ack("Continue anyway (y/N)? ") + if ack.lower()[:1] != "y": + return False + return True + + @register_command() + async def health(self) -> None: + """ + Show generic stats for queues and overly active address + + If the current filter isn't empty show queue counts for both matching and all mails. + Only shows overly active addresses for mails matching the current filter (or all, if not set); never includes mails on hold. + """ + count_q_other = 0 + count_q = { + QueueName.ACTIVE: 0, + QueueName.DEFERRED: 0, + QueueName.HOLD: 0, + } + total_q_other = 0 + total_q = { + QueueName.ACTIVE: 0, + QueueName.DEFERRED: 0, + QueueName.HOLD: 0, + } + stats = UserActivityStats() + for item in await self.source.get_list(): + if item.queue_name in total_q: + total_q[item.queue_name] += 1 + else: + total_q_other += 1 + if self.current_filter.matches(item): + if item.queue_name in count_q: + count_q[item.queue_name] += 1 + else: + count_q_other += 1 + stats.count(item) + stats.prepare() + servers = self.source.server_list() + print(f"Servers: {len(servers)} ({', '.join(servers)})") + if self.current_filter.expressions: + print(f"Active: {count_q[QueueName.ACTIVE]} (total: {total_q[QueueName.ACTIVE]})") + print(f"Deferred: {count_q[QueueName.DEFERRED]} (total: {total_q[QueueName.DEFERRED]})") + print(f"Hold: {count_q[QueueName.HOLD]} (total: {total_q[QueueName.HOLD]})") + if count_q_other: + print(f"Other: {count_q_other} (total: {total_q_other}") + else: + print(f"Active: {count_q[QueueName.ACTIVE]}") + print(f"Deferred: {count_q[QueueName.DEFERRED]}") + print(f"Hold: {count_q[QueueName.HOLD]}") + if count_q_other: + print(f"Other: {count_q_other}") + print() + print("Looking for users with more many open communications matching the current search criteria") + while True: + x = stats.extract_highest_user() + if not x: break + user, activity, mails = x + print("User {} has {} open communications".format(user, activity)) + + QUEUE_FLAGS = { + QueueName.HOLD: '!', + QueueName.ACTIVE: '*', + QueueName.CORRUPT: '?', + QueueName.INCOMING: '$', + QueueName.MAILDROP: '~', + QueueName.DEFERRED: ' ', + } + + def display_list_item(self, *, item: Mail, verbose: bool) -> None: + flag = CLI.QUEUE_FLAGS.get(item.queue_name, ' ') + if verbose: + print( + f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} {item.sender:<60s}" + ) + if not item.recipients: + print(f"{'':21}No recipients listed for this mail?") + for recpt in item.recipients: + print(f"{'':21}{recpt.address}") + if recpt.delay_reason: + print(f"{'':29}{recpt.delay_reason}") + else: + cnt_recpts = len(item.recipients) + if cnt_recpts: + last_recpt = item.recipients[-1].address + print( + f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} " + f"{item.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})" + ) + else: + print( + f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} " + f"{item.sender:<60s} (No recipients listed for this mail?)" + ) + + async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None: + flt: AndFilter + if args: + # additional restrictions + try: + parsed_flt = FILTER_PARSER(args) + except pyparsing.ParseException as e: + print(pyparsing.ParseException.explain(e, 0)) + # print(e.explain(depth=0), file=sys.stderr) + return + flt = self.current_filter & parsed_flt + else: + flt = self.current_filter + show_filter: Filter + if not all and not flt.expressions: + # by default hide active and hold, unless -all variant is used or specific filters are used + show_filter = NotFilter(QueueFilter([QueueName.ACTIVE, QueueName.HOLD])) + else: + show_filter = flt + for item in show_filter.filter(await self.source.get_list()): + self.display_list_item(item=item, verbose=verbose) + + async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]: + mails = { + mail.queue_id: mail + for mail in await self.source.get_list() + } + missing = [] + verified_list = [] + for queue_id in given_ids: + item = mails.get(queue_id, None) + if item: + self.display_list_item(item=item, verbose=False) + verified_list.append(item) + else: + missing.append(queue_id) + if missing: + print(f"Couldn't find mails with the following IDs: {' '.join(missing)}", file=sys.stderr) + return verified_list + + @register_command() + async def list(self, args: str = '') -> None: + """ + List all mails (single line per mail); if no filter is configured hide active and hold queue + + Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands. + Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none + """ + await self.list_impl(args=args, verbose=False, all=False) + + @register_command(name="list-verbose") + async def list_verbose(self, args: str = '') -> None: + """ + List all mails (verbose output); if no filter is configured hide active and hold queue + + Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands. + Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none + """ + await self.list_impl(args=args, verbose=True, all=False) + + @register_command(name="list-all") + async def list_all(self, args: str = '') -> None: + """ + List all mails (including active + hold queue, single line per mail) + + Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands. + Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none + """ + await self.list_impl(args=args, verbose=False, all=True) + + @register_command(name="list-verbose-all") + async def list_verbose_all(self, args: str = '') -> None: + """ + List all mails (including active + hold queue, verbose output) + + Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands. + Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none + """ + await self.list_impl(args=args, verbose=True, all=True) + + @register_command(name="select") + async def select(self, args: str) -> None: + """ + Add filter for mails that other commands work on + + Filter syntax: + - `from bob@example.com`, `from alice@example.com,bob@example.com,@example.net`, `from "alice@example.com" "bob@example.com"` + Multiple address can be given to match any of them; either unquoted and comma separated, or quoted and whitespace + separated. In the unquoted case the (comma separated) pattern must not include any whitespace. + An address starting with `@` only checks the domain name and ignores the local part. + - `from ~regex.*@example.(com|net)`, `from ~"regex.*@example.(com|net)" "other@example.com` + To use regular expressions either put `~` in front of an unquoted pattern (need to repeat for each regex in a + comma separated pattern list) or before the quotes of a quoted pattern. + - `to ...` and `address ...` (matching both to and from) work the same as `from ...` + - `queue hold,deferred` + Comma separated list of queues a mail must be in. For single queue names `queue` can be omitted, e.g. `hold`. + - `$expr1 and $expr2` + - `$expr1 or $expr2` (`a and b or c` is parsed as `a and (b or c)`) + - `($expr)` + - `not $expr` + """ + try: + flt = FILTER_PARSER(args) + except pyparsing.ParseException as e: + print(pyparsing.ParseException.explain(e, 0)) + # print(e.explain(depth=0), file=sys.stderr) + return + self.current_filter.expressions.append(flt) + + @register_command() + async def clear(self) -> None: + """Clear all filters""" + self.current_filter.expressions = [] + + @register_command(name="clear") + async def clear_ndx(self, args: str) -> None: + """Clear filter with given index (zero based; first filter is at index 0)""" + ndx = int(args) + self.current_filter.expressions.pop(ndx) + + @register_command(name="pop") + async def pop(self) -> None: + """Remove last filter""" + if self.current_filter.expressions: + self.current_filter.expressions.pop() + else: + print("No filters configured, nothing to pop", file=sys.stderr) + + @register_command() + async def current(self) -> None: + """Show current filters""" + for e in self.current_filter.expressions: + print(f'select {e}', file=sys.stderr) + + _SHOW_ARGS_PARSER = argparse.ArgumentParser('show', description="Show mails", add_help=False) + _SHOW_ARGS_PARSER.add_argument('-b', dest='body', action='store_true', help="Show body content") + _SHOW_ARGS_PARSER.add_argument('-e', dest='envelope', action='store_true', help="Show message envelope content.") + _SHOW_ARGS_PARSER.add_argument('-h', dest='header', action='store_true', help="Show message header content.") + _SHOW_ARGS_PARSER.add_argument('ID', nargs='+') + + @register_command() + async def show(self, args: str) -> None: + ns_args = self._SHOW_ARGS_PARSER.parse_args(shlex.split(args)) + if ns_args.body or ns_args.envelope or ns_args.header: + flags = [] + if ns_args.body: flags += ['-b'] + if ns_args.envelope: flags += ['-e'] + if ns_args.header: flags += ['-h'] + else: + # default to header only for now + flags = ['-h'] + + head = 16 * "-" + + async def print_mail(result: tuple[str, bytes, bytes]) -> None: + msg, mail, stderr = result + print(f'{head} {msg:^16} {head}') + sys.stdout.flush() + if stderr: + sys.stderr.buffer.write(stderr) + sys.stderr.flush() + sys.stdout.buffer.write(mail) + sys.stdout.flush() + + async def get_mail(msg: str) -> tuple[str, bytes, bytes]: + mail, err = await self.source.get_mail(msg, flags=flags) + if err and not err.endswith(b'\n'): + err += b'\n' + mail = mail or b'' + if mail and not mail.endswith(b'\n'): + mail += b'\n' + return (msg, mail, err) + + async with trio_parallel_ordered(print_mail) as tpo: + for msg in ns_args.ID: + await tpo.start(get_mail, msg) + + print(head + 18*"-" + head) + show.__doc__ = _SHOW_ARGS_PARSER.format_help() + + @register_command(name="flush-all") + async def flush_all(self) -> None: + """ + Flush the queue: attempt to deliver all queued mail. + + Ignores the current filters (but prompts if you have filters). + """ + if self.current_filter.expressions: + print( + "You have currently configured filters, but this command will flush all mails, whether they match the filter or not.\n" + "You might want to use the `flush` command instead (or clear the filters before)", + file=sys.stderr, + ) + ack = input_ack("Continue anyway (y/N)? ") + if ack.lower()[:1] != "y": + return + await self.source.flush_all() + + @register_command() + async def flush(self) -> None: + """Flush (deferred) selected mails in the queue: attempt to deliver them""" + if not self.prompt_if_empty_filter(): + return + flt = QueueFilter([QueueName.DEFERRED]) & self.current_filter + await self.source.flush(flt.filter_ids(await self.source.get_list())) + + @register_command(name="flush") + async def flush_args(self, args: str) -> None: + """Flush (deferred) mails with given IDs: attempt to deliver them""" + await self.source.flush(args.strip().split()) + + async def delete_impl(self, flt: Filter) -> None: + mails = list(flt.filter(await self.source.get_list())) + for item in mails: + self.display_list_item(item=item, verbose=False) + ack = input_ack("Really delete those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + queues = flt.possible_queues() # restrict to selected queues to prevent accidents + await self.source.delete((mail.queue_id for mail in mails), from_queues=queues) + + @register_command() + async def delete(self) -> None: + """Delete selected mails from the hold queue""" + if not self.current_filter.expressions: + print("Command not allowed without filter") + return + # restrict to hold queue + flt = QueueFilter([QueueName.HOLD]) & self.current_filter + await self.delete_impl(flt) + + @register_command(name="delete-all") + async def delete_all(self) -> None: + """Delete selected mails from all queues""" + if not self.current_filter.expressions: + print("Command not allowed without filter") + return + await self.delete_impl(self.current_filter) + + @register_command(name="delete") + async def delete_args(self, args: str) -> None: + """Delete mails with given IDs from the queues""" + delete_list = [mail.queue_id for mail in await self._print_mails_with_ids_for_ack(args.strip().split())] + ack = input_ack("Really delete those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + await self.source.delete(delete_list) + + @register_command() + async def hold(self) -> None: + """Put selected mails on hold""" + if not self.prompt_if_empty_filter(): + return + # basically: skip mails that are already on hold + restrict_from_queues = [QueueName.INCOMING, QueueName.ACTIVE, QueueName.DEFERRED] + flt = QueueFilter(restrict_from_queues) & self.current_filter + queues = flt.possible_queues() + await self.source.hold( + flt.filter_ids(await self.source.get_list()), + from_queues=queues, + ) + + @register_command(name="hold") + async def hold_args(self, args: str) -> None: + """Put mails with given IDs on hold""" + await self.source.hold( + args.strip().split(), + ) + + @register_command() + async def release(self) -> None: + """Release mail that was put "on hold".""" + if not self.prompt_if_empty_filter(): + return + flt = QueueFilter([QueueName.HOLD]) & self.current_filter + queues = flt.possible_queues() + await self.source.release( + flt.filter_ids(await self.source.get_list()), + from_queues=queues, + ) + + @register_command(name="release") + async def release_args(self, args: str) -> None: + """Release mails with given IDs that were put "on hold".""" + await self.source.release( + args.strip().split(), + ) + + @register_command() + async def requeue(self) -> None: + """Requeue mail (move to "maildrop"; get reprocessed)""" + ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ") + if ack.lower()[:1] != "y": + return + if not self.prompt_if_empty_filter(): + return + flt = self.current_filter + queues = flt.possible_queues() + await self.source.requeue( + flt.filter_ids(await self.source.get_list()), + from_queues=queues, + ) + + @register_command(name="requeue") + async def requeue_args(self, args: str) -> None: + """Requeue mail with given IDs (move to "maildrop"; get reprocessed)""" + ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ") + if ack.lower()[:1] != "y": + return + await self.source.requeue( + args.strip().split(), + ) + + @register_command() + async def expire(self) -> None: + """Expire mails and flush them (return error to sender).""" + if not self.prompt_if_empty_filter(): + return + flt = self.current_filter + mails = list(flt.filter(await self.source.get_list())) + for item in mails: + self.display_list_item(item=item, verbose=False) + ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + queues = flt.possible_queues() + await self.source.expire((mail.queue_id for mail in mails), from_queues=queues) + await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED)) + + @register_command(name="expire") + async def expire_args(self, args: str) -> None: + """Expire mails (return error to sender) with given IDs and flush them.""" + mails = await self._print_mails_with_ids_for_ack(args.strip().split()) + ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + await self.source.expire((mail.queue_id for mail in mails)) + await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED)) + + @register_command(name="expire-release") + async def expire_release(self) -> None: + """Expire mails and flush them (return error to sender); release if mails are on hold.""" + if not self.prompt_if_empty_filter(): + return + flt = self.current_filter + mails = list(flt.filter(await self.source.get_list())) + for item in mails: + self.display_list_item(item=item, verbose=False) + ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + queues = flt.possible_queues() + await self.source.expire_release((mail.queue_id for mail in mails), from_queues=queues) + await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD))) + + @register_command(name="expire-release") + async def expire_release_args(self, args: str) -> None: + """Expire mails (return error to sender) with given IDs and flush them; release if mails are on hold.""" + mails = await self._print_mails_with_ids_for_ack(args.strip().split()) + ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + await self.source.expire_release((mail.queue_id for mail in mails)) + await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD))) + + @register_command(name="expire-noflush") + async def expire_noflush(self) -> None: + """Expire mails (return error to sender on next delivery attempt).""" + if not self.prompt_if_empty_filter(): + return + flt = self.current_filter + mails = list(flt.filter(await self.source.get_list())) + for item in mails: + self.display_list_item(item=item, verbose=False) + ack = input_ack("Really expire those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + queues = flt.possible_queues() + await self.source.expire((mail.queue_id for mail in mails), from_queues=queues) + + @register_command(name="expire-noflush") + async def expire_noflush_args(self, args: str) -> None: + """Expire mails (return error to sender on next delivery attempt) with given IDs.""" + mails = await self._print_mails_with_ids_for_ack(args.strip().split()) + ack = input_ack("Really expire those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + await self.source.expire((mail.queue_id for mail in mails)) + + async def prompt(self) -> None: + # main loop of CLI + while True: + try: + full_line = input('# ') + except KeyboardInterrupt: # ctrl-c + print('^C', file=sys.stderr) + continue + except EOFError: # ctrl-d + print() # newline after prompt + return + stripped_line = full_line.strip() + if full_line.startswith(' '): + # don't add space prefixed lines to history + readline.remove_history_item(readline.get_current_history_length() - 1) + elif stripped_line != full_line: + # don't store trailing spaces + readline.replace_history_item(readline.get_current_history_length() - 1, stripped_line) + parts = stripped_line.split(maxsplit=1) + if not parts: + continue + command = parts[0] + has_arg = len(parts) > 1 + try: + if has_arg and command in self._long_commands: + await self._long_commands[command](parts[1]) + continue + elif not has_arg and command in self._simple_commands: + await self._simple_commands[command]() + continue + except (Exception, KeyboardInterrupt): + # TODO: skip first frame (only pointing to line above) + traceback.print_exc() + continue + # unknown command / invalid invocation + if command in self._simple_commands: + print(f"Command {command} doesn't take any arguments", file=sys.stderr) + elif command in self._long_commands: + print(f"Command {command} requires arguments", file=sys.stderr) + else: + print(f"Unknown command: {command}", file=sys.stderr) + + +def main() -> None: + parser = argparse.ArgumentParser(description="postfix (cluster) queue manager") + parser.add_argument('--config', '-c', type=pathlib.Path, help="Path to config file (default: try ~/.config/pqm.yaml, /etc/pqm.yaml)") + args = parser.parse_args() + config = Config.load(args.config or '') + cli = CLI(source=config.source()) + + try: + trio.run(cli.prompt) + except KeyboardInterrupt: + pass + + +main() diff --git a/pqm.example.yaml b/pqm.example.yaml new file mode 100644 index 0000000..4fe5e3c --- /dev/null +++ b/pqm.example.yaml @@ -0,0 +1,14 @@ +--- +# when remote-sources are not given / are empty: defaults to localhost (without ssh) +# each remote source is used as ssh target (with `root@` prefix); custom ssh +# option (separate port, ...) should be configured through ssh config files. +remote-sources: +- mx1.example.com +- mx2.example.com +- mxout1.example.com +- mxout2.example.com +- mxintern1.example.com +- mxintern2.example.com +# prefix with "alias:" if you don't want the first DNS label to be the alias +- orgmx1:mx1.example.org +- orgmx2:mx2.example.org