diff --git a/pqm b/pqm index 74ea4f8..8a8e54e 100755 --- a/pqm +++ b/pqm @@ -14,6 +14,7 @@ import datetime import enum import functools import inspect +import io import json import os import os.path @@ -28,6 +29,7 @@ import sys import traceback import trio import typing +import uuid import yaml T = typing.TypeVar('T') @@ -35,11 +37,25 @@ T = typing.TypeVar('T') @dataclasses.dataclass(slots=True) class Config: + DEFAULT_FORWARD_NOTE: str = """\ +The postmaster decided you should know about the following mails in +the queue, probably because you need to fix something in your setup. + +The postmaster will probably delete the queued message afterwards, +or have them expire (i.e. send a bounce message about failed delivery +to the sender). +""" + # if not remote_sources are configured -> use local queue remote_sources: dict[str, str] = dataclasses.field(default_factory=dict) + forward_note: str = DEFAULT_FORWARD_NOTE + forward_sender: str = 'MAILER-DAEMON' + forward_sender_name: str = 'Mail Delivery System' @staticmethod def load(filename: str = '') -> Config: + config = Config() + if not filename: default_paths = [ os.path.expanduser('~/.config/pqm.yaml'), @@ -51,13 +67,13 @@ class Config: break else: # no configfile -> default config - return Config() + return config with open(filename) as file: data = yaml.safe_load(file) assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}" - rs_list = data.get('remote-sources', []) + + rs_list = data.pop('remote-sources', []) assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}" - remote_sources: dict[str, str] = {} for entry in rs_list: assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}" if ':' in entry: @@ -66,16 +82,33 @@ class Config: target = entry alias = entry.split('.', maxsplit=1)[0] assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}" - if alias in remote_sources: - raise ValueError(f'Duplicate alias {alias!r} in remote-sources (have: {remote_sources[alias]!r}, new: {target!r}') - remote_sources[alias] = target - return Config(remote_sources=remote_sources) + if alias in config.remote_sources: + raise ValueError( + f'Duplicate alias {alias!r} in remote-sources' + f' (have: {config.remote_sources[alias]!r}, new: {target!r}', + ) + config.remote_sources[alias] = target + + config.forward_note = data.pop('forward-note', config.forward_note) + config.forward_sender = data.pop('forward-sender', config.forward_sender) + config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name) + + assert not data, f"Unknown config options: {data}" + + return config + + def forward_build_from(self, *, hostname: str) -> str: + if '@' in self.forward_sender: + sender = self.forward_sender + else: + sender = f'{self.forward_sender}@{hostname}' + return f'{self.forward_sender_name} <{sender}>' def source(self) -> Source: if self.remote_sources: - return RemoteSource(remotes=self.remote_sources) + return RemoteSource(config=self, remotes=self.remote_sources) else: - return LocalSource() + return LocalSource(config=self) def input_ack(prompt: str) -> str: @@ -98,6 +131,62 @@ def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.C return wrapped +def format_mail_forward( + *, + config: Config, + mails: dict[str, tuple[Mail, bytes | None]], + hostname: str, + recipients: typing.Iterable[str], +) -> bytes: + bin_buf = io.BytesIO() + buf = io.TextIOWrapper(bin_buf, write_through=True) + buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n') + for recp in recipients: + buf.write(f'To: <{recp}>\n') + buf.write(f"Subject: Deferred mails on {hostname}\n") + buf.write("MIME-Version: 1.0\n") + boundary = f"postmaster-forward-{uuid.uuid4().hex}" + buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n") + buf.write("\n") + buf.write("This is a message with multiple parts in MIME format.\n") + + buf.write(f"--{boundary}\n") + buf.write("Content-Type: text/plain\n") + buf.write("\n") + buf.write(config.forward_note) + if bytes(bin_buf.getbuffer()[-1:]) != b'\n': + buf.write("\n") + + buf.write(f"--{boundary}\n") + buf.write("Content-Type: text/plain\n") + buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n") + buf.write("\n") + for queue_id, (mail, _) in mails.items(): + mail.print(verbose=True, out=buf) + + for queue_id, (_, mail_message) in mails.items(): + basename = queue_id.replace('/', '_') + if mail_message is None: + # this should be very unlikely unless you try to forward messages from the active queue + buf.write(f"--{boundary}\n") + buf.write("Content-Type: text/plain\n") + buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n") + buf.write("\n") + buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.") + continue + buf.write(f"--{boundary}\n") + buf.write("Content-Type: message/rfc822\n") + buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n") + buf.write("\n") + bin_buf.write(mail_message) + # ensure parts are terminated by newline + if bytes(bin_buf.getbuffer()[-1:]) != b'\n': + buf.write("\n") + + buf.write(f"--{boundary}--\n") + return bin_buf.getvalue() + + class TrioParallelOrdered(typing.Generic[T]): # used by trio_parallel_ordered below @@ -230,10 +319,42 @@ class Mail: queue.append(mail) return queue + def print(self, *, verbose: bool, out: typing.TextIO = sys.stdout) -> None: + flag = CLI.QUEUE_FLAGS.get(self.queue_name, ' ') + if verbose: + print( + f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} {self.sender:<60s}", + file=out, + ) + if not self.recipients: + print(f"{'':21}No recipients listed for this mail?", file=out) + for recpt in self.recipients: + print(f"{'':21}{recpt.address}", file=out) + if recpt.delay_reason: + print(f"{'':29}{recpt.delay_reason}", file=out) + else: + cnt_recpts = len(self.recipients) + if cnt_recpts: + last_recpt = self.recipients[-1].address + print( + f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} " + f"{self.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})", + file=out, + ) + else: + print( + f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} " + f"{self.sender:<60s} (No recipients listed for this mail?)", + file=out, + ) + # abstract collection/cluster of postfix nodes (or just a single one) class Source(abc.ABC): - __slots__ = () + __slots__ = ('config',) + + def __init__(self, *, config: Config) -> None: + self.config = config # list of server names in collection @abc.abstractmethod @@ -331,10 +452,17 @@ class Source(abc.ABC): """ await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues) + @abc.abstractmethod + async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None: + raise NotImplementedError() + class LocalSource(Source): __slots__ = () + def __init__(self, *, config: Config) -> None: + super().__init__(config=config) + def server_list(self) -> list[str]: return [socket.gethostname()] @@ -390,12 +518,26 @@ class LocalSource(Source): data = ''.join(f'{msg}\n' for msg in queue_ids) await trio.run_process(cmd, stdin=data.encode()) + async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None: + mail_messages: dict[str, tuple[Mail, bytes | None]] = {} + for mail in mails: + (mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b']) + if mail_message is None: + print(f"Failed to get mail {mail.queue_id}: {err.decode()}") + mail_messages[mail.queue_id] = (mail, mail_message) + if not mail_messages: + print("No mails found to forward.") + return + message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients) + cmd = ['sendmail', '-t', '-bm', '-f', ''] + await trio.run_process(cmd, stdin=message) + class RemoteSource(Source): __slots__ = ('remotes',) - def __init__(self, remotes: dict[str, str]) -> None: - super().__init__() + def __init__(self, *, config: Config, remotes: dict[str, str]) -> None: + super().__init__(config=config) self.remotes = remotes def server_list(self) -> list[str]: @@ -524,6 +666,37 @@ class RemoteSource(Source): data = ''.join(f'{msg}\n' for msg in ids) nursery.start_soon(run_host, remote_host, data) + async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None: + mails_by_id = { + mail.queue_id: mail + for mail in mails + } + + if not mails_by_id: + print("No mails found to forward.") + + async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None: + mail_messages: dict[str, tuple[Mail, bytes | None]] = {} + for queue_id in queue_ids: + mail = mails_by_id[f'{host}/{queue_id}'] + (mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b']) + if mail_message is None: + print(f"Failed to get mail {mail.queue_id}: {err.decode()}") + mail_messages[mail.queue_id] = (mail, mail_message) + if not mails: + return + message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients) + cmd = ['sendmail', '-t', '-bm', '-f', ''] + cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)] + await trio.run_process(cmd, stdin=message) + + by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True) + + async with trio.open_nursery() as nursery: + for host, ids in by_host.items(): + remote_host = self.remotes[host] + nursery.start_soon(run_host, host, remote_host, ids) + class UserActivityStats: __slots__ = ('count_mails', 'related_mails', 'cut_off') @@ -1334,32 +1507,6 @@ class CLI: QueueName.DEFERRED: ' ', } - def display_list_item(self, *, item: Mail, verbose: bool) -> None: - flag = CLI.QUEUE_FLAGS.get(item.queue_name, ' ') - if verbose: - print( - f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} {item.sender:<60s}" - ) - if not item.recipients: - print(f"{'':21}No recipients listed for this mail?") - for recpt in item.recipients: - print(f"{'':21}{recpt.address}") - if recpt.delay_reason: - print(f"{'':29}{recpt.delay_reason}") - else: - cnt_recpts = len(item.recipients) - if cnt_recpts: - last_recpt = item.recipients[-1].address - print( - f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} " - f"{item.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})" - ) - else: - print( - f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} " - f"{item.sender:<60s} (No recipients listed for this mail?)" - ) - async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None: flt: AndFilter if args: @@ -1380,7 +1527,7 @@ class CLI: else: show_filter = flt for item in show_filter.filter(await self.source.get_list()): - self.display_list_item(item=item, verbose=verbose) + item.print(verbose=verbose) async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]: mails = { @@ -1392,7 +1539,7 @@ class CLI: for queue_id in given_ids: item = mails.get(queue_id, None) if item: - self.display_list_item(item=item, verbose=False) + item.print(verbose=False) verified_list.append(item) else: missing.append(queue_id) @@ -1574,7 +1721,7 @@ class CLI: async def delete_impl(self, flt: Filter) -> None: mails = list(flt.filter(await self.source.get_list())) for item in mails: - self.display_list_item(item=item, verbose=False) + item.print(verbose=False) ack = input_ack("Really delete those mails (y/N)? ") if ack.lower()[:1] != "y": return @@ -1681,7 +1828,7 @@ class CLI: flt = self.current_filter mails = list(flt.filter(await self.source.get_list())) for item in mails: - self.display_list_item(item=item, verbose=False) + item.print(verbose=False) ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ") if ack.lower()[:1] != "y": return @@ -1707,7 +1854,7 @@ class CLI: flt = self.current_filter mails = list(flt.filter(await self.source.get_list())) for item in mails: - self.display_list_item(item=item, verbose=False) + item.print(verbose=False) ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ") if ack.lower()[:1] != "y": return @@ -1733,7 +1880,7 @@ class CLI: flt = self.current_filter mails = list(flt.filter(await self.source.get_list())) for item in mails: - self.display_list_item(item=item, verbose=False) + item.print(verbose=False) ack = input_ack("Really expire those mails (y/N)? ") if ack.lower()[:1] != "y": return @@ -1749,6 +1896,26 @@ class CLI: return await self.source.expire((mail.queue_id for mail in mails)) + @register_command(name="forward-to") + async def forward_to(self, args: str) -> None: + """ + Forward mails to given recipients. + + Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice `). + """ + if not args: + print("No recipients given") + return + if not self.prompt_if_empty_filter(): + return + mails = list(self.current_filter.filter(await self.source.get_list())) + for item in mails: + item.print(verbose=False) + ack = input_ack("Really forward those mails (y/N)? ") + if ack.lower()[:1] != "y": + return + await self.source.forward_to(mails=mails, recipients=args.split()) + async def prompt(self) -> None: # main loop of CLI while True: diff --git a/pqm.example.yaml b/pqm.example.yaml index 4fe5e3c..939e2ba 100644 --- a/pqm.example.yaml +++ b/pqm.example.yaml @@ -12,3 +12,14 @@ remote-sources: # prefix with "alias:" if you don't want the first DNS label to be the alias - orgmx1:mx1.example.org - orgmx2:mx2.example.org + +forward-note: | + The postmaster decided you should know about the following mails in + the queue, probably because you need to fix something in your setup. + + The postmaster will probably delete the queued message afterwards, + or have them expire (i.e. send a bounce message about failed delivery + to the sender). + +forward-sender: MAILER-DAEMON # @$mx gets appended if it doesn't contain an '@' +forward-sender-name: Mail Delivery System