Compare commits

..

No commits in common. "8677051073d948e54b48171415952b4154dbc34f" and "0e016c9ec2ed88f7a8acb1a251aaeb1a091273a9" have entirely different histories.

3 changed files with 44 additions and 229 deletions

7
debian/changelog vendored
View File

@ -1,10 +1,3 @@
pqm (0.3-1) unstable; urgency=medium
* move display_list_item fn to Mail.print
* add "forward-to" command to forward mails in the queue and their verbose state as attachment to given recipients
-- Stefan Bühler <stefan.buehler@tik.uni-stuttgart.de> Tue, 09 Jan 2024 12:00:39 +0100
pqm (0.2-1) unstable; urgency=medium
* fix json stream decode performance issue

255
pqm
View File

@ -14,7 +14,6 @@ import datetime
import enum
import functools
import inspect
import io
import json
import os
import os.path
@ -29,7 +28,6 @@ import sys
import traceback
import trio
import typing
import uuid
import yaml
T = typing.TypeVar('T')
@ -37,25 +35,11 @@ T = typing.TypeVar('T')
@dataclasses.dataclass(slots=True)
class Config:
DEFAULT_FORWARD_NOTE: str = """\
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.
The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
"""
# if not remote_sources are configured -> use local queue
remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
forward_note: str = DEFAULT_FORWARD_NOTE
forward_sender: str = 'MAILER-DAEMON'
forward_sender_name: str = 'Mail Delivery System'
@staticmethod
def load(filename: str = '') -> Config:
config = Config()
if not filename:
default_paths = [
os.path.expanduser('~/.config/pqm.yaml'),
@ -67,13 +51,13 @@ to the sender).
break
else:
# no configfile -> default config
return config
return Config()
with open(filename) as file:
data = yaml.safe_load(file)
assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"
rs_list = data.pop('remote-sources', [])
rs_list = data.get('remote-sources', [])
assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
remote_sources: dict[str, str] = {}
for entry in rs_list:
assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
if ':' in entry:
@ -82,33 +66,16 @@ to the sender).
target = entry
alias = entry.split('.', maxsplit=1)[0]
assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
if alias in config.remote_sources:
raise ValueError(
f'Duplicate alias {alias!r} in remote-sources'
f' (have: {config.remote_sources[alias]!r}, new: {target!r}',
)
config.remote_sources[alias] = target
config.forward_note = data.pop('forward-note', config.forward_note)
config.forward_sender = data.pop('forward-sender', config.forward_sender)
config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name)
assert not data, f"Unknown config options: {data}"
return config
def forward_build_from(self, *, hostname: str) -> str:
if '@' in self.forward_sender:
sender = self.forward_sender
else:
sender = f'{self.forward_sender}@{hostname}'
return f'{self.forward_sender_name} <{sender}>'
if alias in remote_sources:
raise ValueError(f'Duplicate alias {alias!r} in remote-sources (have: {remote_sources[alias]!r}, new: {target!r}')
remote_sources[alias] = target
return Config(remote_sources=remote_sources)
def source(self) -> Source:
if self.remote_sources:
return RemoteSource(config=self, remotes=self.remote_sources)
return RemoteSource(remotes=self.remote_sources)
else:
return LocalSource(config=self)
return LocalSource()
def input_ack(prompt: str) -> str:
@ -131,62 +98,6 @@ def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.C
return wrapped
def format_mail_forward(
*,
config: Config,
mails: dict[str, tuple[Mail, bytes | None]],
hostname: str,
recipients: typing.Iterable[str],
) -> bytes:
bin_buf = io.BytesIO()
buf = io.TextIOWrapper(bin_buf, write_through=True)
buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n')
for recp in recipients:
buf.write(f'To: <{recp}>\n')
buf.write(f"Subject: Deferred mails on {hostname}\n")
buf.write("MIME-Version: 1.0\n")
boundary = f"postmaster-forward-{uuid.uuid4().hex}"
buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n")
buf.write("\n")
buf.write("This is a message with multiple parts in MIME format.\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("\n")
buf.write(config.forward_note)
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n")
buf.write("\n")
for queue_id, (mail, _) in mails.items():
mail.print(verbose=True, out=buf)
for queue_id, (_, mail_message) in mails.items():
basename = queue_id.replace('/', '_')
if mail_message is None:
# this should be very unlikely unless you try to forward messages from the active queue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n")
buf.write("\n")
buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.")
continue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: message/rfc822\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n")
buf.write("\n")
bin_buf.write(mail_message)
# ensure parts are terminated by newline
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}--\n")
return bin_buf.getvalue()
class TrioParallelOrdered(typing.Generic[T]):
# used by trio_parallel_ordered below
@ -319,42 +230,10 @@ class Mail:
queue.append(mail)
return queue
def print(self, *, verbose: bool, out: typing.TextIO = sys.stdout) -> None:
flag = CLI.QUEUE_FLAGS.get(self.queue_name, ' ')
if verbose:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} {self.sender:<60s}",
file=out,
)
if not self.recipients:
print(f"{'':21}No recipients listed for this mail?", file=out)
for recpt in self.recipients:
print(f"{'':21}{recpt.address}", file=out)
if recpt.delay_reason:
print(f"{'':29}{recpt.delay_reason}", file=out)
else:
cnt_recpts = len(self.recipients)
if cnt_recpts:
last_recpt = self.recipients[-1].address
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})",
file=out,
)
else:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (No recipients listed for this mail?)",
file=out,
)
# abstract collection/cluster of postfix nodes (or just a single one)
class Source(abc.ABC):
__slots__ = ('config',)
def __init__(self, *, config: Config) -> None:
self.config = config
__slots__ = ()
# list of server names in collection
@abc.abstractmethod
@ -452,17 +331,10 @@ class Source(abc.ABC):
"""
await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)
@abc.abstractmethod
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
raise NotImplementedError()
class LocalSource(Source):
__slots__ = ()
def __init__(self, *, config: Config) -> None:
super().__init__(config=config)
def server_list(self) -> list[str]:
return [socket.gethostname()]
@ -518,26 +390,12 @@ class LocalSource(Source):
data = ''.join(f'{msg}\n' for msg in queue_ids)
await trio.run_process(cmd, stdin=data.encode())
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for mail in mails:
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mail_messages:
print("No mails found to forward.")
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
await trio.run_process(cmd, stdin=message)
class RemoteSource(Source):
__slots__ = ('remotes',)
def __init__(self, *, config: Config, remotes: dict[str, str]) -> None:
super().__init__(config=config)
def __init__(self, remotes: dict[str, str]) -> None:
super().__init__()
self.remotes = remotes
def server_list(self) -> list[str]:
@ -666,37 +524,6 @@ class RemoteSource(Source):
data = ''.join(f'{msg}\n' for msg in ids)
nursery.start_soon(run_host, remote_host, data)
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mails_by_id = {
mail.queue_id: mail
for mail in mails
}
if not mails_by_id:
print("No mails found to forward.")
async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for queue_id in queue_ids:
mail = mails_by_id[f'{host}/{queue_id}']
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mails:
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)]
await trio.run_process(cmd, stdin=message)
by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True)
async with trio.open_nursery() as nursery:
for host, ids in by_host.items():
remote_host = self.remotes[host]
nursery.start_soon(run_host, host, remote_host, ids)
class UserActivityStats:
__slots__ = ('count_mails', 'related_mails', 'cut_off')
@ -1507,6 +1334,32 @@ class CLI:
QueueName.DEFERRED: ' ',
}
def display_list_item(self, *, item: Mail, verbose: bool) -> None:
flag = CLI.QUEUE_FLAGS.get(item.queue_name, ' ')
if verbose:
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} {item.sender:<60s}"
)
if not item.recipients:
print(f"{'':21}No recipients listed for this mail?")
for recpt in item.recipients:
print(f"{'':21}{recpt.address}")
if recpt.delay_reason:
print(f"{'':29}{recpt.delay_reason}")
else:
cnt_recpts = len(item.recipients)
if cnt_recpts:
last_recpt = item.recipients[-1].address
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} "
f"{item.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})"
)
else:
print(
f"{item.queue_id + flag:<17s} {item.message_size:>8d} {item.arrival_time:%a %b %d %H:%M:%S} "
f"{item.sender:<60s} (No recipients listed for this mail?)"
)
async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None:
flt: AndFilter
if args:
@ -1527,7 +1380,7 @@ class CLI:
else:
show_filter = flt
for item in show_filter.filter(await self.source.get_list()):
item.print(verbose=verbose)
self.display_list_item(item=item, verbose=verbose)
async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]:
mails = {
@ -1539,7 +1392,7 @@ class CLI:
for queue_id in given_ids:
item = mails.get(queue_id, None)
if item:
item.print(verbose=False)
self.display_list_item(item=item, verbose=False)
verified_list.append(item)
else:
missing.append(queue_id)
@ -1721,7 +1574,7 @@ class CLI:
async def delete_impl(self, flt: Filter) -> None:
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
self.display_list_item(item=item, verbose=False)
ack = input_ack("Really delete those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1828,7 +1681,7 @@ class CLI:
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
self.display_list_item(item=item, verbose=False)
ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1854,7 +1707,7 @@ class CLI:
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
self.display_list_item(item=item, verbose=False)
ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1880,7 +1733,7 @@ class CLI:
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
self.display_list_item(item=item, verbose=False)
ack = input_ack("Really expire those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
@ -1896,26 +1749,6 @@ class CLI:
return
await self.source.expire((mail.queue_id for mail in mails))
@register_command(name="forward-to")
async def forward_to(self, args: str) -> None:
"""
Forward mails to given recipients.
Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice <alice.example.com>`).
"""
if not args:
print("No recipients given")
return
if not self.prompt_if_empty_filter():
return
mails = list(self.current_filter.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really forward those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.forward_to(mails=mails, recipients=args.split())
async def prompt(self) -> None:
# main loop of CLI
while True:

View File

@ -12,14 +12,3 @@ remote-sources:
# prefix with "alias:" if you don't want the first DNS label to be the alias
- orgmx1:mx1.example.org
- orgmx2:mx2.example.org
forward-note: |
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.
The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
forward-sender: MAILER-DAEMON # @$mx gets appended if it doesn't contain an '@'
forward-sender-name: Mail Delivery System