add "forward-to" command to forward mails in the queue and their verbose state as attachment to given recipients
This commit is contained in:
parent
ce1aad5454
commit
2c0f1e2a2c
188
pqm
188
pqm
@ -14,6 +14,7 @@ import datetime
|
|||||||
import enum
|
import enum
|
||||||
import functools
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
|
import io
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
@ -28,6 +29,7 @@ import sys
|
|||||||
import traceback
|
import traceback
|
||||||
import trio
|
import trio
|
||||||
import typing
|
import typing
|
||||||
|
import uuid
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
T = typing.TypeVar('T')
|
T = typing.TypeVar('T')
|
||||||
@ -35,11 +37,25 @@ T = typing.TypeVar('T')
|
|||||||
|
|
||||||
@dataclasses.dataclass(slots=True)
|
@dataclasses.dataclass(slots=True)
|
||||||
class Config:
|
class Config:
|
||||||
|
DEFAULT_FORWARD_NOTE: str = """\
|
||||||
|
The postmaster decided you should know about the following mails in
|
||||||
|
the queue, probably because you need to fix something in your setup.
|
||||||
|
|
||||||
|
The postmaster will probably delete the queued message afterwards,
|
||||||
|
or have them expire (i.e. send a bounce message about failed delivery
|
||||||
|
to the sender).
|
||||||
|
"""
|
||||||
|
|
||||||
# if not remote_sources are configured -> use local queue
|
# if not remote_sources are configured -> use local queue
|
||||||
remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
|
remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
|
||||||
|
forward_note: str = DEFAULT_FORWARD_NOTE
|
||||||
|
forward_sender: str = 'MAILER-DAEMON'
|
||||||
|
forward_sender_name: str = 'Mail Delivery System'
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def load(filename: str = '') -> Config:
|
def load(filename: str = '') -> Config:
|
||||||
|
config = Config()
|
||||||
|
|
||||||
if not filename:
|
if not filename:
|
||||||
default_paths = [
|
default_paths = [
|
||||||
os.path.expanduser('~/.config/pqm.yaml'),
|
os.path.expanduser('~/.config/pqm.yaml'),
|
||||||
@ -51,13 +67,13 @@ class Config:
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
# no configfile -> default config
|
# no configfile -> default config
|
||||||
return Config()
|
return config
|
||||||
with open(filename) as file:
|
with open(filename) as file:
|
||||||
data = yaml.safe_load(file)
|
data = yaml.safe_load(file)
|
||||||
assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"
|
assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"
|
||||||
rs_list = data.get('remote-sources', [])
|
|
||||||
|
rs_list = data.pop('remote-sources', [])
|
||||||
assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
|
assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
|
||||||
remote_sources: dict[str, str] = {}
|
|
||||||
for entry in rs_list:
|
for entry in rs_list:
|
||||||
assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
|
assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
|
||||||
if ':' in entry:
|
if ':' in entry:
|
||||||
@ -66,16 +82,33 @@ class Config:
|
|||||||
target = entry
|
target = entry
|
||||||
alias = entry.split('.', maxsplit=1)[0]
|
alias = entry.split('.', maxsplit=1)[0]
|
||||||
assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
|
assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
|
||||||
if alias in remote_sources:
|
if alias in config.remote_sources:
|
||||||
raise ValueError(f'Duplicate alias {alias!r} in remote-sources (have: {remote_sources[alias]!r}, new: {target!r}')
|
raise ValueError(
|
||||||
remote_sources[alias] = target
|
f'Duplicate alias {alias!r} in remote-sources'
|
||||||
return Config(remote_sources=remote_sources)
|
f' (have: {config.remote_sources[alias]!r}, new: {target!r}',
|
||||||
|
)
|
||||||
|
config.remote_sources[alias] = target
|
||||||
|
|
||||||
|
config.forward_note = data.pop('forward-note', config.forward_note)
|
||||||
|
config.forward_sender = data.pop('forward-sender', config.forward_sender)
|
||||||
|
config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name)
|
||||||
|
|
||||||
|
assert not data, f"Unknown config options: {data}"
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
|
def forward_build_from(self, *, hostname: str) -> str:
|
||||||
|
if '@' in self.forward_sender:
|
||||||
|
sender = self.forward_sender
|
||||||
|
else:
|
||||||
|
sender = f'{self.forward_sender}@{hostname}'
|
||||||
|
return f'{self.forward_sender_name} <{sender}>'
|
||||||
|
|
||||||
def source(self) -> Source:
|
def source(self) -> Source:
|
||||||
if self.remote_sources:
|
if self.remote_sources:
|
||||||
return RemoteSource(remotes=self.remote_sources)
|
return RemoteSource(config=self, remotes=self.remote_sources)
|
||||||
else:
|
else:
|
||||||
return LocalSource()
|
return LocalSource(config=self)
|
||||||
|
|
||||||
|
|
||||||
def input_ack(prompt: str) -> str:
|
def input_ack(prompt: str) -> str:
|
||||||
@ -98,6 +131,62 @@ def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.C
|
|||||||
return wrapped
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
|
def format_mail_forward(
|
||||||
|
*,
|
||||||
|
config: Config,
|
||||||
|
mails: dict[str, tuple[Mail, bytes | None]],
|
||||||
|
hostname: str,
|
||||||
|
recipients: typing.Iterable[str],
|
||||||
|
) -> bytes:
|
||||||
|
bin_buf = io.BytesIO()
|
||||||
|
buf = io.TextIOWrapper(bin_buf, write_through=True)
|
||||||
|
buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n')
|
||||||
|
for recp in recipients:
|
||||||
|
buf.write(f'To: <{recp}>\n')
|
||||||
|
buf.write(f"Subject: Deferred mails on {hostname}\n")
|
||||||
|
buf.write("MIME-Version: 1.0\n")
|
||||||
|
boundary = f"postmaster-forward-{uuid.uuid4().hex}"
|
||||||
|
buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n")
|
||||||
|
buf.write("\n")
|
||||||
|
buf.write("This is a message with multiple parts in MIME format.\n")
|
||||||
|
|
||||||
|
buf.write(f"--{boundary}\n")
|
||||||
|
buf.write("Content-Type: text/plain\n")
|
||||||
|
buf.write("\n")
|
||||||
|
buf.write(config.forward_note)
|
||||||
|
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
|
||||||
|
buf.write("\n")
|
||||||
|
|
||||||
|
buf.write(f"--{boundary}\n")
|
||||||
|
buf.write("Content-Type: text/plain\n")
|
||||||
|
buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n")
|
||||||
|
buf.write("\n")
|
||||||
|
for queue_id, (mail, _) in mails.items():
|
||||||
|
mail.print(verbose=True, out=buf)
|
||||||
|
|
||||||
|
for queue_id, (_, mail_message) in mails.items():
|
||||||
|
basename = queue_id.replace('/', '_')
|
||||||
|
if mail_message is None:
|
||||||
|
# this should be very unlikely unless you try to forward messages from the active queue
|
||||||
|
buf.write(f"--{boundary}\n")
|
||||||
|
buf.write("Content-Type: text/plain\n")
|
||||||
|
buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n")
|
||||||
|
buf.write("\n")
|
||||||
|
buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.")
|
||||||
|
continue
|
||||||
|
buf.write(f"--{boundary}\n")
|
||||||
|
buf.write("Content-Type: message/rfc822\n")
|
||||||
|
buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n")
|
||||||
|
buf.write("\n")
|
||||||
|
bin_buf.write(mail_message)
|
||||||
|
# ensure parts are terminated by newline
|
||||||
|
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
|
||||||
|
buf.write("\n")
|
||||||
|
|
||||||
|
buf.write(f"--{boundary}--\n")
|
||||||
|
return bin_buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
class TrioParallelOrdered(typing.Generic[T]):
|
class TrioParallelOrdered(typing.Generic[T]):
|
||||||
# used by trio_parallel_ordered below
|
# used by trio_parallel_ordered below
|
||||||
|
|
||||||
@ -262,7 +351,10 @@ class Mail:
|
|||||||
|
|
||||||
# abstract collection/cluster of postfix nodes (or just a single one)
|
# abstract collection/cluster of postfix nodes (or just a single one)
|
||||||
class Source(abc.ABC):
|
class Source(abc.ABC):
|
||||||
__slots__ = ()
|
__slots__ = ('config',)
|
||||||
|
|
||||||
|
def __init__(self, *, config: Config) -> None:
|
||||||
|
self.config = config
|
||||||
|
|
||||||
# list of server names in collection
|
# list of server names in collection
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
@ -360,10 +452,17 @@ class Source(abc.ABC):
|
|||||||
"""
|
"""
|
||||||
await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)
|
await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
class LocalSource(Source):
|
class LocalSource(Source):
|
||||||
__slots__ = ()
|
__slots__ = ()
|
||||||
|
|
||||||
|
def __init__(self, *, config: Config) -> None:
|
||||||
|
super().__init__(config=config)
|
||||||
|
|
||||||
def server_list(self) -> list[str]:
|
def server_list(self) -> list[str]:
|
||||||
return [socket.gethostname()]
|
return [socket.gethostname()]
|
||||||
|
|
||||||
@ -419,12 +518,26 @@ class LocalSource(Source):
|
|||||||
data = ''.join(f'{msg}\n' for msg in queue_ids)
|
data = ''.join(f'{msg}\n' for msg in queue_ids)
|
||||||
await trio.run_process(cmd, stdin=data.encode())
|
await trio.run_process(cmd, stdin=data.encode())
|
||||||
|
|
||||||
|
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
|
||||||
|
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
|
||||||
|
for mail in mails:
|
||||||
|
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
|
||||||
|
if mail_message is None:
|
||||||
|
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
|
||||||
|
mail_messages[mail.queue_id] = (mail, mail_message)
|
||||||
|
if not mail_messages:
|
||||||
|
print("No mails found to forward.")
|
||||||
|
return
|
||||||
|
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients)
|
||||||
|
cmd = ['sendmail', '-t', '-bm', '-f', '']
|
||||||
|
await trio.run_process(cmd, stdin=message)
|
||||||
|
|
||||||
|
|
||||||
class RemoteSource(Source):
|
class RemoteSource(Source):
|
||||||
__slots__ = ('remotes',)
|
__slots__ = ('remotes',)
|
||||||
|
|
||||||
def __init__(self, remotes: dict[str, str]) -> None:
|
def __init__(self, *, config: Config, remotes: dict[str, str]) -> None:
|
||||||
super().__init__()
|
super().__init__(config=config)
|
||||||
self.remotes = remotes
|
self.remotes = remotes
|
||||||
|
|
||||||
def server_list(self) -> list[str]:
|
def server_list(self) -> list[str]:
|
||||||
@ -553,6 +666,37 @@ class RemoteSource(Source):
|
|||||||
data = ''.join(f'{msg}\n' for msg in ids)
|
data = ''.join(f'{msg}\n' for msg in ids)
|
||||||
nursery.start_soon(run_host, remote_host, data)
|
nursery.start_soon(run_host, remote_host, data)
|
||||||
|
|
||||||
|
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
|
||||||
|
mails_by_id = {
|
||||||
|
mail.queue_id: mail
|
||||||
|
for mail in mails
|
||||||
|
}
|
||||||
|
|
||||||
|
if not mails_by_id:
|
||||||
|
print("No mails found to forward.")
|
||||||
|
|
||||||
|
async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None:
|
||||||
|
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
|
||||||
|
for queue_id in queue_ids:
|
||||||
|
mail = mails_by_id[f'{host}/{queue_id}']
|
||||||
|
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
|
||||||
|
if mail_message is None:
|
||||||
|
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
|
||||||
|
mail_messages[mail.queue_id] = (mail, mail_message)
|
||||||
|
if not mails:
|
||||||
|
return
|
||||||
|
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients)
|
||||||
|
cmd = ['sendmail', '-t', '-bm', '-f', '']
|
||||||
|
cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)]
|
||||||
|
await trio.run_process(cmd, stdin=message)
|
||||||
|
|
||||||
|
by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True)
|
||||||
|
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
|
for host, ids in by_host.items():
|
||||||
|
remote_host = self.remotes[host]
|
||||||
|
nursery.start_soon(run_host, host, remote_host, ids)
|
||||||
|
|
||||||
|
|
||||||
class UserActivityStats:
|
class UserActivityStats:
|
||||||
__slots__ = ('count_mails', 'related_mails', 'cut_off')
|
__slots__ = ('count_mails', 'related_mails', 'cut_off')
|
||||||
@ -1752,6 +1896,26 @@ class CLI:
|
|||||||
return
|
return
|
||||||
await self.source.expire((mail.queue_id for mail in mails))
|
await self.source.expire((mail.queue_id for mail in mails))
|
||||||
|
|
||||||
|
@register_command(name="forward-to")
|
||||||
|
async def forward_to(self, args: str) -> None:
|
||||||
|
"""
|
||||||
|
Forward mails to given recipients.
|
||||||
|
|
||||||
|
Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice <alice.example.com>`).
|
||||||
|
"""
|
||||||
|
if not args:
|
||||||
|
print("No recipients given")
|
||||||
|
return
|
||||||
|
if not self.prompt_if_empty_filter():
|
||||||
|
return
|
||||||
|
mails = list(self.current_filter.filter(await self.source.get_list()))
|
||||||
|
for item in mails:
|
||||||
|
item.print(verbose=False)
|
||||||
|
ack = input_ack("Really forward those mails (y/N)? ")
|
||||||
|
if ack.lower()[:1] != "y":
|
||||||
|
return
|
||||||
|
await self.source.forward_to(mails=mails, recipients=args.split())
|
||||||
|
|
||||||
async def prompt(self) -> None:
|
async def prompt(self) -> None:
|
||||||
# main loop of CLI
|
# main loop of CLI
|
||||||
while True:
|
while True:
|
||||||
|
@ -12,3 +12,14 @@ remote-sources:
|
|||||||
# prefix with "alias:" if you don't want the first DNS label to be the alias
|
# prefix with "alias:" if you don't want the first DNS label to be the alias
|
||||||
- orgmx1:mx1.example.org
|
- orgmx1:mx1.example.org
|
||||||
- orgmx2:mx2.example.org
|
- orgmx2:mx2.example.org
|
||||||
|
|
||||||
|
forward-note: |
|
||||||
|
The postmaster decided you should know about the following mails in
|
||||||
|
the queue, probably because you need to fix something in your setup.
|
||||||
|
|
||||||
|
The postmaster will probably delete the queued message afterwards,
|
||||||
|
or have them expire (i.e. send a bounce message about failed delivery
|
||||||
|
to the sender).
|
||||||
|
|
||||||
|
forward-sender: MAILER-DAEMON # @$mx gets appended if it doesn't contain an '@'
|
||||||
|
forward-sender-name: Mail Delivery System
|
||||||
|
Loading…
Reference in New Issue
Block a user