pqm version 0.3

-----BEGIN PGP SIGNATURE-----
 
 iQJYBAABCgBCFiEEcdms641aWv8vJSXMIcx4mUG+20gFAmWdJxEkHHN0ZWZhbi5i
 dWVobGVyQHRpay51bmktc3R1dHRnYXJ0LmRlAAoJECHMeJlBvttItG4P/25u1tXF
 mf34EyXO7K+vnYUfRl1vTUuE+ipoe8ISUGhCCGHlTE5TEuKbQAbQer3sTT6jVDhk
 L+wYfRlycoPcfVLu/q+dscOyN3nTEgTiS4ToKCyXw7IUwajZU0N82JeD2z80ENzi
 Nf2ZiG2FmcIdQjpFtRHvl9Npq930txyXDqGBV/hH4fDTSS3d44csyCeOsXW95Gub
 iajGsuRaQBAgvXhXFVjLpK2AjS4U/XotLK0NhFoQ+aclhiMqfkpn/Pi8IXmZWNQg
 Twyt5PfDm1FJwUaO0Ser111iFu9T2nGtCPMIqeZ7WJJLEDk3o211drfFSZ/P2ibd
 IzHqoszszcvkP+7JknpJUcOXt9XNP5O7DCRQntjIlNLYEsDvqiYU1aqhSN7nFdCM
 02JG50mLOwRD+USbT55Mz1gDlY1mUuZ7X4GtRrH7CgkCJQIdspBxEDGOaqp2r+dh
 iphQPXDfDfkSsJECdTK5OJkPhHmqQgr9eOcWaen+33ShsBrct2B0dVtM9/nP6Xip
 ibsLYF2dyi7NblL2I5K/5cB+PauuSqJ/erF+5sg3YRVg3bh7GhGtgRh7XDTVaTbr
 Pn4eGgPy14me1Q2Be8toirdz3fJofeTGlO3i6aZb5TCJ3A/qMaRE4+vByxDsNN8A
 el0Hcu3nzxpItIZtMWYKtGoPq+BdyqYLHkk/
 =R0Oe
 -----END PGP SIGNATURE-----

Merge tag 'v0.3' into debian

pqm version 0.3
This commit is contained in:
Stefan Bühler 2024-01-09 12:00:10 +01:00
commit 16af2e2e8c
2 changed files with 222 additions and 44 deletions

255
pqm
View File

@ -14,6 +14,7 @@ import datetime
import enum
import functools
import inspect
import io
import json
import os
import os.path
@ -28,6 +29,7 @@ import sys
import traceback
import trio
import typing
import uuid
import yaml
T = typing.TypeVar('T')
@ -35,11 +37,25 @@ T = typing.TypeVar('T')
@dataclasses.dataclass(slots=True)
class Config:
DEFAULT_FORWARD_NOTE: str = """\
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.
The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
"""
# if not remote_sources are configured -> use local queue
remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
forward_note: str = DEFAULT_FORWARD_NOTE
forward_sender: str = 'MAILER-DAEMON'
forward_sender_name: str = 'Mail Delivery System'
@staticmethod
def load(filename: str = '') -> Config:
config = Config()
if not filename:
default_paths = [
os.path.expanduser('~/.config/pqm.yaml'),
@ -51,13 +67,13 @@ class Config:
break
else:
# no configfile -> default config
return Config()
return config
with open(filename) as file:
data = yaml.safe_load(file)
assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"
rs_list = data.get('remote-sources', [])
rs_list = data.pop('remote-sources', [])
assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
remote_sources: dict[str, str] = {}
for entry in rs_list:
assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
if ':' in entry:
@ -66,16 +82,33 @@ class Config:
target = entry
alias = entry.split('.', maxsplit=1)[0]
assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
if alias in remote_sources:
raise ValueError(f'Duplicate alias {alias!r} in remote-sources (have: {remote_sources[alias]!r}, new: {target!r}')
remote_sources[alias] = target
return Config(remote_sources=remote_sources)
if alias in config.remote_sources:
raise ValueError(
f'Duplicate alias {alias!r} in remote-sources'
f' (have: {config.remote_sources[alias]!r}, new: {target!r}',
)
config.remote_sources[alias] = target
config.forward_note = data.pop('forward-note', config.forward_note)
config.forward_sender = data.pop('forward-sender', config.forward_sender)
config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name)
assert not data, f"Unknown config options: {data}"
return config
def forward_build_from(self, *, hostname: str) -> str:
if '@' in self.forward_sender:
sender = self.forward_sender
else:
sender = f'{self.forward_sender}@{hostname}'
return f'{self.forward_sender_name} <{sender}>'
def source(self) -> Source:
if self.remote_sources:
return RemoteSource(remotes=self.remote_sources)
return RemoteSource(config=self, remotes=self.remote_sources)
else:
return LocalSource()
return LocalSource(config=self)
def input_ack(prompt: str) -> str:
@ -98,6 +131,62 @@ def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.C
return wrapped
def format_mail_forward(
*,
config: Config,
mails: dict[str, tuple[Mail, bytes | None]],
hostname: str,
recipients: typing.Iterable[str],
) -> bytes:
bin_buf = io.BytesIO()
buf = io.TextIOWrapper(bin_buf, write_through=True)
buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n')
for recp in recipients:
buf.write(f'To: <{recp}>\n')
buf.write(f"Subject: Deferred mails on {hostname}\n")
buf.write("MIME-Version: 1.0\n")
boundary = f"postmaster-forward-{uuid.uuid4().hex}"
buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n")
buf.write("\n")
buf.write("This is a message with multiple parts in MIME format.\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("\n")
buf.write(config.forward_note)
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n")
buf.write("\n")
for queue_id, (mail, _) in mails.items():
mail.print(verbose=True, out=buf)
for queue_id, (_, mail_message) in mails.items():
basename = queue_id.replace('/', '_')
if mail_message is None:
# this should be very unlikely unless you try to forward messages from the active queue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n")
buf.write("\n")
buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.")
continue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: message/rfc822\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n")
buf.write("\n")
bin_buf.write(mail_message)
# ensure parts are terminated by newline
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}--\n")
return bin_buf.getvalue()
class TrioParallelOrdered(typing.Generic[T]):
# used by trio_parallel_ordered below
@ -230,10 +319,42 @@ class Mail:
queue.append(mail)
return queue
def print(self, *, verbose: bool, out: typing.TextIO = sys.stdout) -> None:
flag = CLI.QUEUE_FLAGS.get(self.queue_name, ' ')
if verbose:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} {self.sender:<60s}",
file=out,
)
if not self.recipients:
print(f"{'':21}No recipients listed for this mail?", file=out)
for recpt in self.recipients:
print(f"{'':21}{recpt.address}", file=out)
if recpt.delay_reason:
print(f"{'':29}{recpt.delay_reason}", file=out)
else:
cnt_recpts = len(self.recipients)
if cnt_recpts:
last_recpt = self.recipients[-1].address
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})",
file=out,
)
else:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (No recipients listed for this mail?)",
file=out,
)
# abstract collection/cluster of postfix nodes (or just a single one)
class Source(abc.ABC):
__slots__ = ()
__slots__ = ('config',)
def __init__(self, *, config: Config) -> None:
self.config = config
# list of server names in collection
@abc.abstractmethod
@ -331,10 +452,17 @@ class Source(abc.ABC):
"""
await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)
@abc.abstractmethod
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
raise NotImplementedError()
class LocalSource(Source):
__slots__ = ()
def __init__(self, *, config: Config) -> None:
super().__init__(config=config)
def server_list(self) -> list[str]:
return [socket.gethostname()]
@ -390,12 +518,26 @@ class LocalSource(Source):
data = ''.join(f'{msg}\n' for msg in queue_ids)
await trio.run_process(cmd, stdin=data.encode())
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for mail in mails:
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mail_messages:
print("No mails found to forward.")
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
await trio.run_process(cmd, stdin=message)
class RemoteSource(Source):
__slots__ = ('remotes',)
def __init__(self, remotes: dict[str, str]) -> None:
super().__init__()
def __init__(self, *, config: Config, remotes: dict[str, str]) -> None:
super().__init__(config=config)
self.remotes = remotes
def server_list(self) -> list[str]:
@ -524,6 +666,37 @@ class RemoteSource(Source):
data = ''.join(f'{msg}\n' for msg in ids)
nursery.start_soon(run_host, remote_host, data)
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mails_by_id = {
mail.queue_id: mail
for mail in mails
}
if not mails_by_id:
print("No mails found to forward.")
async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for queue_id in queue_ids:
mail = mails_by_id[f'{host}/{queue_id}']
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mails:
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)]
await trio.run_process(cmd, stdin=message)
by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True)
async with trio.open_nursery() as nursery:
for host, ids in by_host.items():
remote_host = self.remotes[host]
nursery.start_soon(run_host, host, remote_host, ids)
class UserActivityStats:
__slots__ = ('count_mails', 'related_mails', 'cut_off')
@ -1334,32 +1507,6 @@ class CLI:
QueueName.DEFERRED: ' ',
}
def display_list_item(self, *, item: Mail, verbose: bool) -> None:
flag = CLI.QUEUE_FLAGS.get(item.queue_name, ' ')
if verbose:
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} {item.sender:<60s}"
)
if not item.recipients:
print(f"{'':21}No recipients listed for this mail?")
for recpt in item.recipients:
print(f"{'':21}{recpt.address}")
if recpt.delay_reason:
print(f"{'':29}{recpt.delay_reason}")
else:
cnt_recpts = len(item.recipients)
if cnt_recpts:
last_recpt = item.recipients[-1].address
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} "
f"{item.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})"
)
else:
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} "
f"{item.sender:<60s} (No recipients listed for this mail?)"
)
async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None:
flt: AndFilter
if args:
@ -1380,7 +1527,7 @@ class CLI:
else:
show_filter = flt
for item in show_filter.filter(await self.source.get_list()):
self.display_list_item(item=item, verbose=verbose)
item.print(verbose=verbose)
async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]:
mails = {
@ -1392,7 +1539,7 @@ class CLI:
for queue_id in given_ids:
item = mails.get(queue_id, None)
if item:
self.display_list_item(item=item, verbose=False)
item.print(verbose=False)
verified_list.append(item)
else:
missing.append(queue_id)
@ -1574,7 +1721,7 @@ class CLI:
async def delete_impl(self, flt: Filter) -> None:
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
self.display_list_item(item=item, verbose=False)
item.print(verbose=False)
ack = input_ack("Really delete those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1681,7 +1828,7 @@ class CLI:
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
self.display_list_item(item=item, verbose=False)
item.print(verbose=False)
ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1707,7 +1854,7 @@ class CLI:
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
self.display_list_item(item=item, verbose=False)
item.print(verbose=False)
ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1733,7 +1880,7 @@ class CLI:
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
self.display_list_item(item=item, verbose=False)
item.print(verbose=False)
ack = input_ack("Really expire those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1749,6 +1896,26 @@ class CLI:
return
await self.source.expire((mail.queue_id for mail in mails))
@register_command(name="forward-to")
async def forward_to(self, args: str) -> None:
"""
Forward mails to given recipients.
Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice <alice.example.com>`).
"""
if not args:
print("No recipients given")
return
if not self.prompt_if_empty_filter():
return
mails = list(self.current_filter.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really forward those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.forward_to(mails=mails, recipients=args.split())
async def prompt(self) -> None:
# main loop of CLI
while True:

View File

@ -12,3 +12,14 @@ remote-sources:
# prefix with "alias:" if you don't want the first DNS label to be the alias
- orgmx1:mx1.example.org
- orgmx2:mx2.example.org
forward-note: |
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.
The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
forward-sender: MAILER-DAEMON # @$mx gets appended if it doesn't contain an '@'
forward-sender-name: Mail Delivery System