Compare commits

...

2 Commits

2 changed files with 222 additions and 44 deletions

255
pqm
View File

@ -14,6 +14,7 @@ import datetime
import enum import enum
import functools import functools
import inspect import inspect
import io
import json import json
import os import os
import os.path import os.path
@ -28,6 +29,7 @@ import sys
import traceback import traceback
import trio import trio
import typing import typing
import uuid
import yaml import yaml
T = typing.TypeVar('T') T = typing.TypeVar('T')
@ -35,11 +37,25 @@ T = typing.TypeVar('T')
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
class Config: class Config:
DEFAULT_FORWARD_NOTE: str = """\
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.
The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
"""
# if not remote_sources are configured -> use local queue # if not remote_sources are configured -> use local queue
remote_sources: dict[str, str] = dataclasses.field(default_factory=dict) remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
forward_note: str = DEFAULT_FORWARD_NOTE
forward_sender: str = 'MAILER-DAEMON'
forward_sender_name: str = 'Mail Delivery System'
@staticmethod @staticmethod
def load(filename: str = '') -> Config: def load(filename: str = '') -> Config:
config = Config()
if not filename: if not filename:
default_paths = [ default_paths = [
os.path.expanduser('~/.config/pqm.yaml'), os.path.expanduser('~/.config/pqm.yaml'),
@ -51,13 +67,13 @@ class Config:
break break
else: else:
# no configfile -> default config # no configfile -> default config
return Config() return config
with open(filename) as file: with open(filename) as file:
data = yaml.safe_load(file) data = yaml.safe_load(file)
assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}" assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"
rs_list = data.get('remote-sources', [])
rs_list = data.pop('remote-sources', [])
assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}" assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
remote_sources: dict[str, str] = {}
for entry in rs_list: for entry in rs_list:
assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}" assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
if ':' in entry: if ':' in entry:
@ -66,16 +82,33 @@ class Config:
target = entry target = entry
alias = entry.split('.', maxsplit=1)[0] alias = entry.split('.', maxsplit=1)[0]
assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}" assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
if alias in remote_sources: if alias in config.remote_sources:
raise ValueError(f'Duplicate alias {alias!r} in remote-sources (have: {remote_sources[alias]!r}, new: {target!r}') raise ValueError(
remote_sources[alias] = target f'Duplicate alias {alias!r} in remote-sources'
return Config(remote_sources=remote_sources) f' (have: {config.remote_sources[alias]!r}, new: {target!r}',
)
config.remote_sources[alias] = target
config.forward_note = data.pop('forward-note', config.forward_note)
config.forward_sender = data.pop('forward-sender', config.forward_sender)
config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name)
assert not data, f"Unknown config options: {data}"
return config
def forward_build_from(self, *, hostname: str) -> str:
if '@' in self.forward_sender:
sender = self.forward_sender
else:
sender = f'{self.forward_sender}@{hostname}'
return f'{self.forward_sender_name} <{sender}>'
def source(self) -> Source: def source(self) -> Source:
if self.remote_sources: if self.remote_sources:
return RemoteSource(remotes=self.remote_sources) return RemoteSource(config=self, remotes=self.remote_sources)
else: else:
return LocalSource() return LocalSource(config=self)
def input_ack(prompt: str) -> str: def input_ack(prompt: str) -> str:
@ -98,6 +131,62 @@ def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.C
return wrapped return wrapped
def format_mail_forward(
*,
config: Config,
mails: dict[str, tuple[Mail, bytes | None]],
hostname: str,
recipients: typing.Iterable[str],
) -> bytes:
bin_buf = io.BytesIO()
buf = io.TextIOWrapper(bin_buf, write_through=True)
buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n')
for recp in recipients:
buf.write(f'To: <{recp}>\n')
buf.write(f"Subject: Deferred mails on {hostname}\n")
buf.write("MIME-Version: 1.0\n")
boundary = f"postmaster-forward-{uuid.uuid4().hex}"
buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n")
buf.write("\n")
buf.write("This is a message with multiple parts in MIME format.\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("\n")
buf.write(config.forward_note)
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n")
buf.write("\n")
for queue_id, (mail, _) in mails.items():
mail.print(verbose=True, out=buf)
for queue_id, (_, mail_message) in mails.items():
basename = queue_id.replace('/', '_')
if mail_message is None:
# this should be very unlikely unless you try to forward messages from the active queue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n")
buf.write("\n")
buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.")
continue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: message/rfc822\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n")
buf.write("\n")
bin_buf.write(mail_message)
# ensure parts are terminated by newline
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}--\n")
return bin_buf.getvalue()
class TrioParallelOrdered(typing.Generic[T]): class TrioParallelOrdered(typing.Generic[T]):
# used by trio_parallel_ordered below # used by trio_parallel_ordered below
@ -230,10 +319,42 @@ class Mail:
queue.append(mail) queue.append(mail)
return queue return queue
def print(self, *, verbose: bool, out: typing.TextIO = sys.stdout) -> None:
flag = CLI.QUEUE_FLAGS.get(self.queue_name, ' ')
if verbose:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} {self.sender:<60s}",
file=out,
)
if not self.recipients:
print(f"{'':21}No recipients listed for this mail?", file=out)
for recpt in self.recipients:
print(f"{'':21}{recpt.address}", file=out)
if recpt.delay_reason:
print(f"{'':29}{recpt.delay_reason}", file=out)
else:
cnt_recpts = len(self.recipients)
if cnt_recpts:
last_recpt = self.recipients[-1].address
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})",
file=out,
)
else:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (No recipients listed for this mail?)",
file=out,
)
# abstract collection/cluster of postfix nodes (or just a single one) # abstract collection/cluster of postfix nodes (or just a single one)
class Source(abc.ABC): class Source(abc.ABC):
__slots__ = () __slots__ = ('config',)
def __init__(self, *, config: Config) -> None:
self.config = config
# list of server names in collection # list of server names in collection
@abc.abstractmethod @abc.abstractmethod
@ -331,10 +452,17 @@ class Source(abc.ABC):
""" """
await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues) await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)
@abc.abstractmethod
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
raise NotImplementedError()
class LocalSource(Source): class LocalSource(Source):
__slots__ = () __slots__ = ()
def __init__(self, *, config: Config) -> None:
super().__init__(config=config)
def server_list(self) -> list[str]: def server_list(self) -> list[str]:
return [socket.gethostname()] return [socket.gethostname()]
@ -390,12 +518,26 @@ class LocalSource(Source):
data = ''.join(f'{msg}\n' for msg in queue_ids) data = ''.join(f'{msg}\n' for msg in queue_ids)
await trio.run_process(cmd, stdin=data.encode()) await trio.run_process(cmd, stdin=data.encode())
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for mail in mails:
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mail_messages:
print("No mails found to forward.")
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
await trio.run_process(cmd, stdin=message)
class RemoteSource(Source): class RemoteSource(Source):
__slots__ = ('remotes',) __slots__ = ('remotes',)
def __init__(self, remotes: dict[str, str]) -> None: def __init__(self, *, config: Config, remotes: dict[str, str]) -> None:
super().__init__() super().__init__(config=config)
self.remotes = remotes self.remotes = remotes
def server_list(self) -> list[str]: def server_list(self) -> list[str]:
@ -524,6 +666,37 @@ class RemoteSource(Source):
data = ''.join(f'{msg}\n' for msg in ids) data = ''.join(f'{msg}\n' for msg in ids)
nursery.start_soon(run_host, remote_host, data) nursery.start_soon(run_host, remote_host, data)
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mails_by_id = {
mail.queue_id: mail
for mail in mails
}
if not mails_by_id:
print("No mails found to forward.")
async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for queue_id in queue_ids:
mail = mails_by_id[f'{host}/{queue_id}']
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mails:
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)]
await trio.run_process(cmd, stdin=message)
by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True)
async with trio.open_nursery() as nursery:
for host, ids in by_host.items():
remote_host = self.remotes[host]
nursery.start_soon(run_host, host, remote_host, ids)
class UserActivityStats: class UserActivityStats:
__slots__ = ('count_mails', 'related_mails', 'cut_off') __slots__ = ('count_mails', 'related_mails', 'cut_off')
@ -1334,32 +1507,6 @@ class CLI:
QueueName.DEFERRED: ' ', QueueName.DEFERRED: ' ',
} }
def display_list_item(self, *, item: Mail, verbose: bool) -> None:
flag = CLI.QUEUE_FLAGS.get(item.queue_name, ' ')
if verbose:
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} {item.sender:<60s}"
)
if not item.recipients:
print(f"{'':21}No recipients listed for this mail?")
for recpt in item.recipients:
print(f"{'':21}{recpt.address}")
if recpt.delay_reason:
print(f"{'':29}{recpt.delay_reason}")
else:
cnt_recpts = len(item.recipients)
if cnt_recpts:
last_recpt = item.recipients[-1].address
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} "
f"{item.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})"
)
else:
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} "
f"{item.sender:<60s} (No recipients listed for this mail?)"
)
async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None: async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None:
flt: AndFilter flt: AndFilter
if args: if args:
@ -1380,7 +1527,7 @@ class CLI:
else: else:
show_filter = flt show_filter = flt
for item in show_filter.filter(await self.source.get_list()): for item in show_filter.filter(await self.source.get_list()):
self.display_list_item(item=item, verbose=verbose) item.print(verbose=verbose)
async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]: async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]:
mails = { mails = {
@ -1392,7 +1539,7 @@ class CLI:
for queue_id in given_ids: for queue_id in given_ids:
item = mails.get(queue_id, None) item = mails.get(queue_id, None)
if item: if item:
self.display_list_item(item=item, verbose=False) item.print(verbose=False)
verified_list.append(item) verified_list.append(item)
else: else:
missing.append(queue_id) missing.append(queue_id)
@ -1574,7 +1721,7 @@ class CLI:
async def delete_impl(self, flt: Filter) -> None: async def delete_impl(self, flt: Filter) -> None:
mails = list(flt.filter(await self.source.get_list())) mails = list(flt.filter(await self.source.get_list()))
for item in mails: for item in mails:
self.display_list_item(item=item, verbose=False) item.print(verbose=False)
ack = input_ack("Really delete those mails (y/N)? ") ack = input_ack("Really delete those mails (y/N)? ")
if ack.lower()[:1] != "y": if ack.lower()[:1] != "y":
return return
@ -1681,7 +1828,7 @@ class CLI:
flt = self.current_filter flt = self.current_filter
mails = list(flt.filter(await self.source.get_list())) mails = list(flt.filter(await self.source.get_list()))
for item in mails: for item in mails:
self.display_list_item(item=item, verbose=False) item.print(verbose=False)
ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ") ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
if ack.lower()[:1] != "y": if ack.lower()[:1] != "y":
return return
@ -1707,7 +1854,7 @@ class CLI:
flt = self.current_filter flt = self.current_filter
mails = list(flt.filter(await self.source.get_list())) mails = list(flt.filter(await self.source.get_list()))
for item in mails: for item in mails:
self.display_list_item(item=item, verbose=False) item.print(verbose=False)
ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ") ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
if ack.lower()[:1] != "y": if ack.lower()[:1] != "y":
return return
@ -1733,7 +1880,7 @@ class CLI:
flt = self.current_filter flt = self.current_filter
mails = list(flt.filter(await self.source.get_list())) mails = list(flt.filter(await self.source.get_list()))
for item in mails: for item in mails:
self.display_list_item(item=item, verbose=False) item.print(verbose=False)
ack = input_ack("Really expire those mails (y/N)? ") ack = input_ack("Really expire those mails (y/N)? ")
if ack.lower()[:1] != "y": if ack.lower()[:1] != "y":
return return
@ -1749,6 +1896,26 @@ class CLI:
return return
await self.source.expire((mail.queue_id for mail in mails)) await self.source.expire((mail.queue_id for mail in mails))
@register_command(name="forward-to")
async def forward_to(self, args: str) -> None:
"""
Forward mails to given recipients.
Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice <alice.example.com>`).
"""
if not args:
print("No recipients given")
return
if not self.prompt_if_empty_filter():
return
mails = list(self.current_filter.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really forward those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.forward_to(mails=mails, recipients=args.split())
async def prompt(self) -> None: async def prompt(self) -> None:
# main loop of CLI # main loop of CLI
while True: while True:

View File

@ -12,3 +12,14 @@ remote-sources:
# prefix with "alias:" if you don't want the first DNS label to be the alias # prefix with "alias:" if you don't want the first DNS label to be the alias
- orgmx1:mx1.example.org - orgmx1:mx1.example.org
- orgmx2:mx2.example.org - orgmx2:mx2.example.org
forward-note: |
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.
The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
forward-sender: MAILER-DAEMON # @$mx gets appended if it doesn't contain an '@'
forward-sender-name: Mail Delivery System