1976 lines
75 KiB
Python
Executable File
1976 lines
75 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Copyright © Universität Stuttgart
|
|
# LICENSE: MIT
|
|
|
|
from __future__ import annotations
|
|
|
|
import abc
|
|
import argparse
|
|
import atexit
|
|
import contextlib
|
|
import dataclasses
|
|
import datetime
|
|
import enum
|
|
import functools
|
|
import inspect
|
|
import io
|
|
import json
|
|
import os
|
|
import os.path
|
|
import pathlib
|
|
import shlex
|
|
import pyparsing
|
|
import re
|
|
import readline
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import trio
|
|
import typing
|
|
import uuid
|
|
import yaml
|
|
|
|
T = typing.TypeVar('T')
|
|
|
|
|
|
@dataclasses.dataclass(slots=True)
|
|
class Config:
|
|
DEFAULT_FORWARD_NOTE: str = """\
|
|
The postmaster decided you should know about the following mails in
|
|
the queue, probably because you need to fix something in your setup.
|
|
|
|
The postmaster will probably delete the queued message afterwards,
|
|
or have them expire (i.e. send a bounce message about failed delivery
|
|
to the sender).
|
|
"""
|
|
|
|
# if not remote_sources are configured -> use local queue
|
|
remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
|
|
forward_note: str = DEFAULT_FORWARD_NOTE
|
|
forward_sender: str = 'MAILER-DAEMON'
|
|
forward_sender_name: str = 'Mail Delivery System'
|
|
|
|
@staticmethod
|
|
def load(filename: str = '') -> Config:
|
|
config = Config()
|
|
|
|
if not filename:
|
|
default_paths = [
|
|
os.path.expanduser('~/.config/pqm.yaml'),
|
|
'/etc/pqm.yaml',
|
|
]
|
|
for path in default_paths:
|
|
if os.path.exists(path):
|
|
filename = path
|
|
break
|
|
else:
|
|
# no configfile -> default config
|
|
return config
|
|
with open(filename) as file:
|
|
data = yaml.safe_load(file)
|
|
assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"
|
|
|
|
rs_list = data.pop('remote-sources', [])
|
|
assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
|
|
for entry in rs_list:
|
|
assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
|
|
if ':' in entry:
|
|
alias, target = entry.split(':', maxsplit=1)
|
|
else:
|
|
target = entry
|
|
alias = entry.split('.', maxsplit=1)[0]
|
|
assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
|
|
if alias in config.remote_sources:
|
|
raise ValueError(
|
|
f'Duplicate alias {alias!r} in remote-sources'
|
|
f' (have: {config.remote_sources[alias]!r}, new: {target!r}',
|
|
)
|
|
config.remote_sources[alias] = target
|
|
|
|
config.forward_note = data.pop('forward-note', config.forward_note)
|
|
config.forward_sender = data.pop('forward-sender', config.forward_sender)
|
|
config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name)
|
|
|
|
assert not data, f"Unknown config options: {data}"
|
|
|
|
return config
|
|
|
|
def forward_build_from(self, *, hostname: str) -> str:
|
|
if '@' in self.forward_sender:
|
|
sender = self.forward_sender
|
|
else:
|
|
sender = f'{self.forward_sender}@{hostname}'
|
|
return f'{self.forward_sender_name} <{sender}>'
|
|
|
|
def source(self) -> Source:
|
|
if self.remote_sources:
|
|
return RemoteSource(config=self, remotes=self.remote_sources)
|
|
else:
|
|
return LocalSource(config=self)
|
|
|
|
|
|
def input_ack(prompt: str) -> str:
|
|
# don't want readline for simple acks (and input always uses readline)
|
|
sys.stdout.write(prompt)
|
|
sys.stdout.flush()
|
|
return sys.stdin.readline(16)
|
|
|
|
|
|
def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.Callable[[pyparsing.ParseResults], T]:
|
|
# treat any exception thrown within a custom parser action as `ParseFatalException`,
|
|
# (stops parsing immediately)
|
|
@functools.wraps(act)
|
|
def wrapped(tokens: pyparsing.ParseResults):
|
|
try:
|
|
return act(tokens)
|
|
except Exception as e:
|
|
raise pyparsing.ParseFatalException(str(e))
|
|
|
|
return wrapped
|
|
|
|
|
|
def format_mail_forward(
|
|
*,
|
|
config: Config,
|
|
mails: dict[str, tuple[Mail, bytes | None]],
|
|
hostname: str,
|
|
recipients: typing.Iterable[str],
|
|
) -> bytes:
|
|
bin_buf = io.BytesIO()
|
|
buf = io.TextIOWrapper(bin_buf, write_through=True)
|
|
buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n')
|
|
for recp in recipients:
|
|
buf.write(f'To: <{recp}>\n')
|
|
buf.write(f"Subject: Deferred mails on {hostname}\n")
|
|
buf.write("MIME-Version: 1.0\n")
|
|
boundary = f"postmaster-forward-{uuid.uuid4().hex}"
|
|
buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n")
|
|
buf.write("\n")
|
|
buf.write("This is a message with multiple parts in MIME format.\n")
|
|
|
|
buf.write(f"--{boundary}\n")
|
|
buf.write("Content-Type: text/plain\n")
|
|
buf.write("\n")
|
|
buf.write(config.forward_note)
|
|
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
|
|
buf.write("\n")
|
|
|
|
buf.write(f"--{boundary}\n")
|
|
buf.write("Content-Type: text/plain\n")
|
|
buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n")
|
|
buf.write("\n")
|
|
for queue_id, (mail, _) in mails.items():
|
|
mail.print(verbose=True, out=buf)
|
|
|
|
for queue_id, (_, mail_message) in mails.items():
|
|
basename = queue_id.replace('/', '_')
|
|
if mail_message is None:
|
|
# this should be very unlikely unless you try to forward messages from the active queue
|
|
buf.write(f"--{boundary}\n")
|
|
buf.write("Content-Type: text/plain\n")
|
|
buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n")
|
|
buf.write("\n")
|
|
buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.")
|
|
continue
|
|
buf.write(f"--{boundary}\n")
|
|
buf.write("Content-Type: message/rfc822\n")
|
|
buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n")
|
|
buf.write("\n")
|
|
bin_buf.write(mail_message)
|
|
# ensure parts are terminated by newline
|
|
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
|
|
buf.write("\n")
|
|
|
|
buf.write(f"--{boundary}--\n")
|
|
return bin_buf.getvalue()
|
|
|
|
|
|
class TrioParallelOrdered(typing.Generic[T]):
|
|
# used by trio_parallel_ordered below
|
|
|
|
def __init__(self, nursery: trio.Nursery, result_handler: typing.Callable[[T], typing.Awaitable[None]]) -> None:
|
|
# basic idea: for each task create a oneshot channel (capacity 1) to deliver result;
|
|
# put receiving channel into self._tx (-> ordered by task start time)
|
|
# in _handle_results: get result channel (still ordered), then wait for result
|
|
self.nursery = nursery
|
|
self.result_handler = result_handler
|
|
self.limit = trio.CapacityLimiter(8)
|
|
send_channel: trio.MemorySendChannel[trio.MemoryReceiveChannel[T]]
|
|
receive_channel: trio.MemoryReceiveChannel[trio.MemoryReceiveChannel[T]]
|
|
(send_channel, receive_channel) = trio.open_memory_channel(128)
|
|
self._tx = send_channel
|
|
|
|
async def _handle_results() -> None:
|
|
item: trio.MemoryReceiveChannel[T]
|
|
async for item in receive_channel:
|
|
try:
|
|
result = await item.receive()
|
|
except trio.EndOfChannel:
|
|
continue
|
|
await self.result_handler(result)
|
|
|
|
self.nursery.start_soon(_handle_results)
|
|
|
|
async def close(self) -> None:
|
|
await self._tx.aclose()
|
|
|
|
async def _spawn(self, job, args, kwargs, *, task_status=trio.TASK_STATUS_IGNORED) -> None:
|
|
tx: trio.MemorySendChannel[T]
|
|
rx: trio.MemoryReceiveChannel[T]
|
|
(tx, rx) = trio.open_memory_channel(1)
|
|
await self._tx.send(rx)
|
|
async with self.limit:
|
|
task_status.started()
|
|
async with tx:
|
|
result = await job(*args, **kwargs)
|
|
await tx.send(result)
|
|
|
|
async def start(self, job, *args, **kwargs) -> None:
|
|
await self.nursery.start(self._spawn, job, args, kwargs)
|
|
|
|
|
|
# async def handle_result_of_some_job_function(result) -> None: ...
|
|
# async def some_job_function(foo) -> ResultType: ...
|
|
# async with trio_parallel_ordered(handle_result_of_some_job_function) as tpo:
|
|
# for foo in biglist:
|
|
# await tpo.start(some_job_function, foo)
|
|
@contextlib.asynccontextmanager
|
|
async def trio_parallel_ordered(
|
|
result_handler: typing.Callable[[T], typing.Awaitable[None]],
|
|
) -> typing.AsyncGenerator[TrioParallelOrdered[T], None]:
|
|
async with trio.open_nursery() as nursery:
|
|
tpo = TrioParallelOrdered(nursery=nursery, result_handler=result_handler)
|
|
yield tpo
|
|
await tpo.close()
|
|
|
|
|
|
@dataclasses.dataclass(slots=True)
|
|
class Recipient:
|
|
"""Recipient in a postfix mail"""
|
|
address: str
|
|
# "error message" from last delivery attempt
|
|
delay_reason: str
|
|
|
|
@staticmethod
|
|
def from_json(d: dict) -> Recipient:
|
|
return Recipient(
|
|
address=d['address'],
|
|
delay_reason=d.get('delay_reason', ''),
|
|
)
|
|
|
|
|
|
class QueueName(enum.Enum):
|
|
MAILDROP = "maildrop"
|
|
HOLD = "hold"
|
|
INCOMING = "incoming"
|
|
ACTIVE = "active"
|
|
DEFERRED = "deferred"
|
|
CORRUPT = "corrupt"
|
|
|
|
|
|
ALL_QUEUE_NAMES: set[QueueName] = set(QueueName)
|
|
|
|
|
|
def json_decode_stream(data: str):
|
|
decoder = json.JSONDecoder()
|
|
data_len = len(data)
|
|
data_pos = 0
|
|
while data_len > data_pos and data[data_len-1].isspace():
|
|
data_len -= 1
|
|
while True:
|
|
while data_pos < data_len and data[data_pos].isspace():
|
|
data_pos += 1
|
|
if data_pos >= data_len:
|
|
return
|
|
obj, data_pos = decoder.raw_decode(data, data_pos)
|
|
yield obj
|
|
|
|
|
|
@dataclasses.dataclass(slots=True)
|
|
class Mail:
|
|
"""Metadata for mail in postfix queue"""
|
|
queue_name: QueueName
|
|
queue_id: str
|
|
arrival_time: datetime.datetime
|
|
message_size: int
|
|
sender: str
|
|
recipients: list[Recipient]
|
|
# forced_expire: bool
|
|
|
|
@staticmethod
|
|
def from_json(d: dict) -> Mail:
|
|
return Mail(
|
|
queue_name=QueueName(d['queue_name']),
|
|
queue_id=d['queue_id'],
|
|
arrival_time=datetime.datetime.fromtimestamp(d['arrival_time']),
|
|
message_size=d['message_size'],
|
|
sender=d['sender'],
|
|
recipients=[Recipient.from_json(recp) for recp in d['recipients']],
|
|
)
|
|
|
|
@staticmethod
|
|
def read_postqueue_json(data: str, id_prefix: str = '') -> list[Mail]:
|
|
queue = []
|
|
for obj in json_decode_stream(data):
|
|
mail = Mail.from_json(obj)
|
|
mail.queue_id = id_prefix + mail.queue_id
|
|
queue.append(mail)
|
|
return queue
|
|
|
|
def print(self, *, verbose: bool, out: typing.TextIO = sys.stdout) -> None:
|
|
flag = CLI.QUEUE_FLAGS.get(self.queue_name, ' ')
|
|
if verbose:
|
|
print(
|
|
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} {self.sender:<60s}",
|
|
file=out,
|
|
)
|
|
if not self.recipients:
|
|
print(f"{'':21}No recipients listed for this mail?", file=out)
|
|
for recpt in self.recipients:
|
|
print(f"{'':21}{recpt.address}", file=out)
|
|
if recpt.delay_reason:
|
|
print(f"{'':29}{recpt.delay_reason}", file=out)
|
|
else:
|
|
cnt_recpts = len(self.recipients)
|
|
if cnt_recpts:
|
|
last_recpt = self.recipients[-1].address
|
|
print(
|
|
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
|
|
f"{self.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})",
|
|
file=out,
|
|
)
|
|
else:
|
|
print(
|
|
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
|
|
f"{self.sender:<60s} (No recipients listed for this mail?)",
|
|
file=out,
|
|
)
|
|
|
|
|
|
# abstract collection/cluster of postfix nodes (or just a single one)
|
|
class Source(abc.ABC):
|
|
__slots__ = ('config',)
|
|
|
|
def __init__(self, *, config: Config) -> None:
|
|
self.config = config
|
|
|
|
# list of server names in collection
|
|
@abc.abstractmethod
|
|
def server_list(self) -> list[str]:
|
|
raise NotImplementedError()
|
|
|
|
# split "list" of input elements into chunks
|
|
@typing.final
|
|
@staticmethod
|
|
def chunks(data: typing.Iterable[T], *, chunk_size: int = 512) -> typing.Generator[list[T], None, None]:
|
|
if isinstance(data, list):
|
|
while data:
|
|
yield data[:chunk_size]
|
|
data = data[chunk_size:]
|
|
else:
|
|
chunk = []
|
|
for item in data:
|
|
chunk.append(item)
|
|
if len(chunk) >= chunk_size:
|
|
yield chunk
|
|
chunk = []
|
|
if chunk:
|
|
yield chunk
|
|
|
|
# retrieve list of mails in all queues
|
|
@abc.abstractmethod
|
|
async def get_list(self) -> list[Mail]:
|
|
raise NotImplementedError()
|
|
|
|
# basically postcat, and flags are used as command line arguments to postcast
|
|
# returns stdout and stderr of postcat
|
|
@abc.abstractmethod
|
|
async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
|
|
raise NotImplementedError()
|
|
|
|
@abc.abstractmethod
|
|
async def flush_all(self) -> None:
|
|
"""Flush the queue: attempt to deliver all queued mail."""
|
|
# postqueue -f
|
|
raise NotImplementedError()
|
|
|
|
@abc.abstractmethod
|
|
async def flush(self, queue_ids: typing.Iterable[str]) -> None:
|
|
"""Schedule immediate delivery of deferred mail with the specified queue IDs."""
|
|
# postqueue -i MSG1 -i MSG2 -i ...
|
|
raise NotImplementedError()
|
|
|
|
# abstract way to call `postsuper`, action is the command line flag for the action to take
|
|
@abc.abstractmethod
|
|
async def _postsuper(
|
|
self,
|
|
action: str,
|
|
queue_ids: typing.Iterable[str],
|
|
*,
|
|
from_queues: set[QueueName] | tuple[()] = (),
|
|
):
|
|
raise NotImplementedError()
|
|
|
|
async def delete(self, queue_ids: typing.Iterable[str], *, from_queues: set[QueueName] | tuple[()] = ()) -> None:
|
|
"""
|
|
Delete one message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
|
|
"""
|
|
await self._postsuper('-d', queue_ids=queue_ids, from_queues=from_queues)
|
|
|
|
async def hold(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
|
|
"""
|
|
Put mail "on hold" so that no attempt is made to deliver it.
|
|
Move one message with the named queue ID from the named mail queue(s) (default: incoming, active and deferred) to the hold queue.
|
|
"""
|
|
await self._postsuper('-h', queue_ids=queue_ids, from_queues=from_queues)
|
|
|
|
async def release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
|
|
"""
|
|
Release mail that was put "on hold".
|
|
Move one message with the named queue ID from the named mail queue(s) (default: hold) to the deferred queue.
|
|
"""
|
|
await self._postsuper('-H', queue_ids=queue_ids, from_queues=from_queues)
|
|
|
|
async def requeue(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
|
|
"""
|
|
Requeue the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
|
|
"""
|
|
await self._postsuper('-r', queue_ids=queue_ids, from_queues=from_queues)
|
|
|
|
async def expire(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
|
|
"""
|
|
Expire the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
|
|
"""
|
|
await self._postsuper('-e', queue_ids=queue_ids, from_queues=from_queues)
|
|
|
|
async def expire_release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
|
|
"""
|
|
Expire and release (if in hold) the message with the named queue ID from the named mail queue(s)
|
|
(default queues: hold, incoming, active and deferred).
|
|
"""
|
|
await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)
|
|
|
|
@abc.abstractmethod
|
|
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
|
|
raise NotImplementedError()
|
|
|
|
|
|
class LocalSource(Source):
|
|
__slots__ = ()
|
|
|
|
def __init__(self, *, config: Config) -> None:
|
|
super().__init__(config=config)
|
|
|
|
def server_list(self) -> list[str]:
|
|
return [socket.gethostname()]
|
|
|
|
async def get_list(self) -> list[Mail]:
|
|
res: subprocess.CompletedProcess = await trio.run_process(
|
|
['/usr/sbin/postqueue', '-j'],
|
|
capture_stdout=True,
|
|
)
|
|
queue = Mail.read_postqueue_json(res.stdout.decode('utf-8'))
|
|
queue = sorted(queue, key=lambda item: item.arrival_time) # sort by incoming date, newest last
|
|
return queue
|
|
|
|
async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
|
|
"""stdout and stderr of postcat; stdout is None if we got non-zero status"""
|
|
if not re.fullmatch(r'[A-Z0-9]+', queue_id):
|
|
return (None, f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/".encode())
|
|
flags_s = ' '.join(flags)
|
|
res: subprocess.CompletedProcess = await trio.run_process(
|
|
['/usr/sbin/postcat'] + list(flags_s) + ['-q', queue_id],
|
|
capture_stdout=True,
|
|
capture_stderr=True,
|
|
check=False,
|
|
)
|
|
if res.returncode != 0:
|
|
# expect some error was printed to stderr
|
|
return (None, res.stderr)
|
|
return (res.stdout, res.stderr)
|
|
|
|
async def flush_all(self) -> None:
|
|
await trio.run_process(['/usr/sbin/postqueue', '-f'])
|
|
|
|
async def flush(self, queue_ids: typing.Iterable[str]) -> None:
|
|
# postqueue only deals with single messages, because the flush daemon
|
|
# only accepts a single (flush) request per connection...
|
|
for msg in queue_ids:
|
|
await trio.run_process(['/usr/sbin/postqueue', '-i', msg])
|
|
|
|
async def _postsuper(
|
|
self,
|
|
action: str,
|
|
queue_ids: typing.Iterable[str],
|
|
*,
|
|
from_queues: set[QueueName] | tuple[()] = (),
|
|
):
|
|
cmd = ['/usr/sbin/postsuper', action, '-']
|
|
if isinstance(from_queues, set):
|
|
from_queues = from_queues - set((QueueName.CORRUPT,)) # remove "invalid" queue names
|
|
if from_queues:
|
|
cmd += [qn.value for qn in from_queues]
|
|
else:
|
|
# empty set of queues allowed -> no queue allowed
|
|
return
|
|
data = ''.join(f'{msg}\n' for msg in queue_ids)
|
|
await trio.run_process(cmd, stdin=data.encode())
|
|
|
|
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
|
|
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
|
|
for mail in mails:
|
|
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
|
|
if mail_message is None:
|
|
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
|
|
mail_messages[mail.queue_id] = (mail, mail_message)
|
|
if not mail_messages:
|
|
print("No mails found to forward.")
|
|
return
|
|
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients)
|
|
cmd = ['sendmail', '-t', '-bm', '-f', '']
|
|
await trio.run_process(cmd, stdin=message)
|
|
|
|
|
|
class RemoteSource(Source):
|
|
__slots__ = ('remotes',)
|
|
|
|
def __init__(self, *, config: Config, remotes: dict[str, str]) -> None:
|
|
super().__init__(config=config)
|
|
self.remotes = remotes
|
|
|
|
def server_list(self) -> list[str]:
|
|
return list(self.remotes)
|
|
|
|
# split "remote mail id" into alias for remote and id on remote system
|
|
@staticmethod
|
|
def decode_queue_id(queue_id: str, *, allow_all: bool = False) -> tuple[str, str]:
|
|
parts = queue_id.split('/', maxsplit=1)
|
|
if len(parts) < 2:
|
|
raise ValueError(f"Invalid mail id: {queue_id!r} - expected HOST/localid")
|
|
host, queue_id = parts
|
|
if not re.fullmatch(r'[A-Z0-9]+', queue_id):
|
|
raise ValueError(f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/")
|
|
if not allow_all and queue_id == 'ALL':
|
|
raise ValueError(f"Invalid queue id: {queue_id!r} - ALL not allowed")
|
|
return (host, queue_id)
|
|
|
|
async def get_list(self) -> list[Mail]:
|
|
queue: list[Mail] = []
|
|
|
|
async def get_host(remote_id: str, remote_host: str) -> None:
|
|
remote_id_prefix = remote_id + '/'
|
|
res: subprocess.CompletedProcess = await trio.run_process(
|
|
['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -j'],
|
|
capture_stdout=True,
|
|
)
|
|
queue.extend(Mail.read_postqueue_json(res.stdout.decode('utf-8'), id_prefix=remote_id_prefix))
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
for remote_id, remote_host in self.remotes.items():
|
|
nursery.start_soon(get_host, remote_id, remote_host)
|
|
|
|
queue = sorted(queue, key=lambda item: item.arrival_time) # sort by incoming date, newest last
|
|
return queue
|
|
|
|
async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
|
|
try:
|
|
host, queue_id = self.decode_queue_id(queue_id)
|
|
except ValueError as e:
|
|
return (None, str(e).encode())
|
|
if not host in self.remotes:
|
|
return (None, f"Unknown remote host: {host!r}".encode())
|
|
remote_host = self.remotes[host]
|
|
cmd = ['/usr/sbin/postcat'] + list(flags) + ['-q', queue_id]
|
|
res: subprocess.CompletedProcess = await trio.run_process(
|
|
['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)],
|
|
capture_stdout=True,
|
|
capture_stderr=True,
|
|
check=False,
|
|
)
|
|
if res.returncode != 0:
|
|
# expect some error was printed to stderr
|
|
return (None, res.stderr)
|
|
return (res.stdout, res.stderr)
|
|
|
|
async def flush_all(self) -> None:
|
|
async def flush_host(remote_host: str) -> None:
|
|
await trio.run_process(
|
|
['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -f'],
|
|
)
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
for _, remote_host in self.remotes.items():
|
|
nursery.start_soon(flush_host, remote_host)
|
|
|
|
# group "long ids" (with alias/ prefix) by alias (remote host)
|
|
def aggregate_by_host(self, queue_ids: typing.Iterable[str], *, allow_all: bool) -> dict[str, list[str]]:
|
|
by_host: dict[str, list[str]] = {}
|
|
for long_queue_id in queue_ids:
|
|
if long_queue_id == 'ALL':
|
|
if not allow_all:
|
|
raise ValueError(f"Invalid queue id: {long_queue_id!r} - ALL not allowed")
|
|
for host in self.remotes:
|
|
by_host.setdefault(host, []).append('ALL')
|
|
continue
|
|
host, queue_id = self.decode_queue_id(long_queue_id, allow_all=allow_all)
|
|
if not host in by_host:
|
|
by_host[host] = [queue_id]
|
|
else:
|
|
by_host[host].append(queue_id)
|
|
for host in list(by_host):
|
|
if not host in self.remotes:
|
|
ids = by_host.pop(host)
|
|
raise ValueError(f"Unknown remote host {host!r}, used for {len(ids)} queue ids")
|
|
return by_host
|
|
|
|
async def flush(self, queue_ids: typing.Iterable[str]) -> None:
|
|
by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=False)
|
|
|
|
async def run_host(remote_host: str, ids: list[str]) -> None:
|
|
for run in self.chunks(ids):
|
|
# postqueue only deals with single messages, because the flush daemon
|
|
# only accepts a single (flush) request per connection...
|
|
cmd = 'for msg in ' + shlex.join(run) + '; do /usr/sbin/postqueue -i "${msg}"; done'
|
|
await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, cmd])
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
for host, ids in by_host.items():
|
|
remote_host = self.remotes[host]
|
|
nursery.start_soon(run_host, remote_host, ids)
|
|
|
|
async def _postsuper(
|
|
self,
|
|
action: str,
|
|
queue_ids: typing.Iterable[str],
|
|
*,
|
|
from_queues: set[QueueName] | tuple[()] = (),
|
|
) -> None:
|
|
cmd = ['/usr/sbin/postsuper', action, '-']
|
|
if isinstance(from_queues, set):
|
|
from_queues = from_queues - set((QueueName.CORRUPT,)) # remove "invalid" queue names
|
|
if from_queues:
|
|
cmd += [qn.value for qn in from_queues]
|
|
else:
|
|
# empty set of queues allowed -> no queue allowed
|
|
return
|
|
by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=True)
|
|
|
|
async def run_host(remote_host: str, data: str) -> None:
|
|
await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)], stdin=data.encode())
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
for host, ids in by_host.items():
|
|
remote_host = self.remotes[host]
|
|
data = ''.join(f'{msg}\n' for msg in ids)
|
|
nursery.start_soon(run_host, remote_host, data)
|
|
|
|
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
|
|
mails_by_id = {
|
|
mail.queue_id: mail
|
|
for mail in mails
|
|
}
|
|
|
|
if not mails_by_id:
|
|
print("No mails found to forward.")
|
|
|
|
async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None:
|
|
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
|
|
for queue_id in queue_ids:
|
|
mail = mails_by_id[f'{host}/{queue_id}']
|
|
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
|
|
if mail_message is None:
|
|
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
|
|
mail_messages[mail.queue_id] = (mail, mail_message)
|
|
if not mails:
|
|
return
|
|
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients)
|
|
cmd = ['sendmail', '-t', '-bm', '-f', '']
|
|
cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)]
|
|
await trio.run_process(cmd, stdin=message)
|
|
|
|
by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True)
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
for host, ids in by_host.items():
|
|
remote_host = self.remotes[host]
|
|
nursery.start_soon(run_host, host, remote_host, ids)
|
|
|
|
|
|
class UserActivityStats:
|
|
__slots__ = ('count_mails', 'related_mails', 'cut_off')
|
|
|
|
# find most active address for `health` command
|
|
|
|
def __init__(self, cut_off: int = 10) -> None:
|
|
# count recipients individually for each sender, and recipients just once
|
|
self.count_mails: dict[str, int] = {}
|
|
self.related_mails: dict[str, list[Mail]] = {}
|
|
self.cut_off = cut_off
|
|
|
|
def count(self, item: Mail) -> None:
|
|
if QueueName.HOLD == item.queue_name:
|
|
return
|
|
sender = item.sender.lower()
|
|
if item.sender != 'MAILER-DAEMON':
|
|
if not sender in self.count_mails:
|
|
self.count_mails[sender] = 0
|
|
self.related_mails[sender] = []
|
|
self.count_mails[sender] += len(item.recipients)
|
|
self.related_mails[sender].append(item)
|
|
recipients = {
|
|
recp.address.lower()
|
|
for recp in item.recipients
|
|
}
|
|
for recp in recipients:
|
|
if sender == recp: continue # count user just once per mail
|
|
if not recp in self.count_mails:
|
|
self.count_mails[recp] = 0
|
|
self.related_mails[recp] = []
|
|
self.count_mails[recp] += 1
|
|
self.related_mails[recp].append(item)
|
|
|
|
def prepare(self) -> None:
|
|
for (user, count) in list(self.count_mails.items()):
|
|
if count < self.cut_off:
|
|
self.count_mails.pop(user)
|
|
|
|
def extract_highest_user(self) -> None | tuple[str, int, list[Mail]]:
|
|
max_count = 0
|
|
if not self.count_mails:
|
|
return None
|
|
user = ''
|
|
for (loop_user, count) in list(self.count_mails.items()):
|
|
if count > max_count:
|
|
user = loop_user
|
|
max_count = count
|
|
activity = self.count_mails.pop(user)
|
|
mails = self.related_mails.pop(user)
|
|
for item in mails:
|
|
sender = item.sender.lower()
|
|
if user != sender and sender in self.count_mails:
|
|
self.count_mails[sender] -= len(item.recipients)
|
|
if self.count_mails[sender] < self.cut_off:
|
|
self.count_mails.pop(sender)
|
|
recipients = {
|
|
recp.address.lower()
|
|
for recp in item.recipients
|
|
}
|
|
for recp in recipients:
|
|
if sender == recp: continue # count user just once per mail
|
|
if user != recp and recp in self.count_mails:
|
|
self.count_mails[recp] -= 1
|
|
if self.count_mails[recp] < self.cut_off:
|
|
self.count_mails.pop(recp)
|
|
return (user, activity, mails)
|
|
|
|
|
|
class Filter(abc.ABC):
|
|
"""abstract base class for filter expressions"""
|
|
|
|
__slots__ = ()
|
|
|
|
def __init__(self) -> None:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def matches(self, mail: Mail) -> bool:
|
|
"""Whether mail matches the filter"""
|
|
raise NotImplementedError()
|
|
|
|
@abc.abstractmethod
|
|
def possible_queues(self) -> set[QueueName]:
|
|
"""Which queues matched mails are possible from (all unless explicitly restricted)"""
|
|
raise NotImplementedError()
|
|
|
|
@abc.abstractmethod
|
|
def __repr__(self) -> str:
|
|
raise NotImplementedError()
|
|
|
|
@typing.final
|
|
def filter(self, mails: typing.Iterable[Mail]) -> typing.Generator[Mail, None, None]:
|
|
"""Filter list of mails"""
|
|
for mail in mails:
|
|
if self.matches(mail):
|
|
yield mail
|
|
|
|
@typing.final
|
|
def filter_ids(self, mails: typing.Iterable[Mail]) -> typing.Generator[str, None, None]:
|
|
"""Filter list of mails but only return mail ids"""
|
|
for mail in mails:
|
|
if self.matches(mail):
|
|
yield mail.queue_id
|
|
|
|
def __and__(self, other: typing.Any) -> Filter:
|
|
"""Combine two filter into one that only matches mails that matched both"""
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
if other is TRUE_FILTER:
|
|
return self
|
|
if other is FALSE_FILTER:
|
|
return other
|
|
if isinstance(other, AndFilter):
|
|
return AndFilter([self] + other.expressions)
|
|
return AndFilter([self, other])
|
|
|
|
def __or__(self, other: typing.Any) -> Filter:
|
|
"""Combine two filter into one that matches mails that matched at least one"""
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
if other is TRUE_FILTER:
|
|
return other
|
|
if other is FALSE_FILTER:
|
|
return self
|
|
if isinstance(other, OrFilter):
|
|
return OrFilter([self] + other.expressions)
|
|
return OrFilter([self, other])
|
|
|
|
def __invert__(self) -> Filter:
|
|
"""Create filter that exactly matches mail that didn't match the original filter"""
|
|
return NotFilter(self)
|
|
|
|
|
|
# abstract base class for filters that don't need (...) around in representations of members in combined filters
|
|
class SingleExprFilter(Filter):
|
|
__slots__ = ()
|
|
|
|
|
|
class FalseFilter(SingleExprFilter):
|
|
"""constant false - matches no mail"""
|
|
|
|
__slots__ = ()
|
|
|
|
def __repr__(self) -> str:
|
|
return '0'
|
|
|
|
def matches(self, mail: Mail) -> bool:
|
|
return False
|
|
|
|
def possible_queues(self) -> set[QueueName]:
|
|
return set()
|
|
|
|
def __and__(self, other: typing.Any) -> Filter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
return self
|
|
|
|
def __or__(self, other: typing.Any) -> Filter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
return other
|
|
|
|
def __invert__(self) -> Filter:
|
|
return TRUE_FILTER
|
|
|
|
|
|
class TrueFilter(SingleExprFilter):
|
|
"""constant true - matches all mail"""
|
|
|
|
__slots__ = ()
|
|
|
|
def __repr__(self) -> str:
|
|
return '1'
|
|
|
|
def matches(self, mail: Mail) -> bool:
|
|
return True
|
|
|
|
def possible_queues(self) -> set[QueueName]:
|
|
return ALL_QUEUE_NAMES
|
|
|
|
def __and__(self, other: typing.Any) -> Filter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
return other
|
|
|
|
def __or__(self, other: typing.Any) -> Filter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
return self
|
|
|
|
def __invert__(self) -> Filter:
|
|
return FALSE_FILTER
|
|
|
|
|
|
FALSE_FILTER = FalseFilter()
|
|
TRUE_FILTER = TrueFilter()
|
|
|
|
|
|
class AndFilter(Filter):
|
|
__slots__ = ('expressions',)
|
|
|
|
def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None:
|
|
super().__init__()
|
|
self.expressions: list[Filter] = expressions or []
|
|
|
|
def matches(self, mail: Mail) -> bool:
|
|
for expr in self.expressions:
|
|
if not expr.matches(mail):
|
|
return False
|
|
return True
|
|
|
|
def possible_queues(self) -> set[QueueName]:
|
|
res = set(ALL_QUEUE_NAMES) # clone set
|
|
for expr in self.expressions:
|
|
res &= expr.possible_queues()
|
|
return res
|
|
|
|
def __repr__(self) -> str:
|
|
if len(self.expressions) == 0:
|
|
return '1'
|
|
if len(self.expressions) == 1:
|
|
return repr(self.expressions[0])
|
|
subexprs = []
|
|
for e in self.expressions:
|
|
if isinstance(e, (SingleExprFilter, AndFilter)):
|
|
subexprs.append(repr(e))
|
|
else:
|
|
subexprs.append(f'({e!r})')
|
|
return ' and '.join(subexprs)
|
|
|
|
def __and__(self, other: typing.Any) -> AndFilter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
if isinstance(other, AndFilter):
|
|
return AndFilter(self.expressions + other.expressions)
|
|
return AndFilter(self.expressions + [other])
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
|
|
assert len(toks) == 1
|
|
inner_toks = list(toks[0])
|
|
assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}"
|
|
token = inner_toks.pop(0)
|
|
assert isinstance(token, Filter)
|
|
while inner_toks:
|
|
assert inner_toks.pop(0) == 'and', f"Missing 'and' in {toks}"
|
|
next_token = inner_toks.pop(0)
|
|
assert isinstance(next_token, Filter), f"Non-filter in {toks}"
|
|
token = token & next_token
|
|
return token
|
|
|
|
|
|
class OrFilter(Filter):
|
|
__slots__ = ('expressions',)
|
|
|
|
def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None:
|
|
super().__init__()
|
|
self.expressions: list[Filter] = expressions or []
|
|
|
|
def matches(self, mail: Mail) -> bool:
|
|
for expr in self.expressions:
|
|
if expr.matches(mail):
|
|
return True
|
|
return False
|
|
|
|
def possible_queues(self) -> set[QueueName]:
|
|
res = set()
|
|
for expr in self.expressions:
|
|
res |= expr.possible_queues()
|
|
return res
|
|
|
|
def __repr__(self) -> str:
|
|
if len(self.expressions) == 0:
|
|
return '0'
|
|
if len(self.expressions) == 1:
|
|
return repr(self.expressions[0])
|
|
subexprs = []
|
|
for e in self.expressions:
|
|
if isinstance(e, (SingleExprFilter, OrFilter)):
|
|
subexprs.append(repr(e))
|
|
else:
|
|
subexprs.append(f'({e!r})')
|
|
return ' or '.join(subexprs)
|
|
|
|
def __or__(self, other: typing.Any) -> OrFilter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
if isinstance(other, OrFilter):
|
|
return OrFilter(self.expressions + other.expressions)
|
|
return OrFilter(self.expressions + [other])
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
|
|
assert len(toks) == 1
|
|
inner_toks = list(toks[0])
|
|
assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}"
|
|
token = inner_toks.pop(0)
|
|
assert isinstance(token, Filter)
|
|
while inner_toks:
|
|
assert inner_toks.pop(0) == 'or', f"Missing 'or' in {toks}"
|
|
next_token = inner_toks.pop(0)
|
|
assert isinstance(next_token, Filter), f"Non-filter in {toks}"
|
|
token = token | next_token
|
|
return token
|
|
|
|
|
|
class NotFilter(SingleExprFilter):
|
|
__slots__ = ('expression',)
|
|
|
|
def __init__(self, expression: Filter) -> None:
|
|
super().__init__()
|
|
self.expression = expression
|
|
|
|
def matches(self, mail: Mail) -> bool:
|
|
return not self.expression.matches(mail)
|
|
|
|
def possible_queues(self) -> set[QueueName]:
|
|
# negation could match mails from any filter!
|
|
if isinstance(self.expression, QueueFilter):
|
|
# in this case we actually no the negation
|
|
return ALL_QUEUE_NAMES - self.expression.select
|
|
# otherwise: everything is possible
|
|
return ALL_QUEUE_NAMES
|
|
|
|
def __repr__(self) -> str:
|
|
if isinstance(self.expression, SingleExprFilter):
|
|
return f'not {self.expression!r}'
|
|
else:
|
|
return f'not ({self.expression!r})'
|
|
|
|
def __invert__(self) -> Filter:
|
|
return self.expression
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
|
|
assert len(toks) == 1
|
|
toks = toks[0]
|
|
assert len(toks) == 2
|
|
assert toks[0] == 'not'
|
|
assert isinstance(toks[1], Filter)
|
|
return ~toks[1]
|
|
|
|
|
|
class QueueFilter(SingleExprFilter):
|
|
"""match mails based on queue they are in"""
|
|
|
|
__slots__ = ('select',)
|
|
|
|
def __init__(self, select: list[QueueName]) -> None:
|
|
self.select = set(select)
|
|
|
|
def matches(self, mail: Mail) -> bool:
|
|
return mail.queue_name in self.select
|
|
|
|
def possible_queues(self) -> set[QueueName]:
|
|
return self.select
|
|
|
|
def __repr__(self) -> str:
|
|
if not self.select:
|
|
return '0'
|
|
return f'queue {",".join(qn.value for qn in self.select)}'
|
|
|
|
def __and__(self, other: typing.Any) -> Filter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
if isinstance(other, QueueFilter):
|
|
select = self.select & other.select
|
|
if select:
|
|
return QueueFilter(list(select))
|
|
else:
|
|
return FALSE_FILTER
|
|
return super().__and__(other)
|
|
|
|
def __or__(self, other: typing.Any) -> Filter:
|
|
if not isinstance(other, Filter):
|
|
return NotImplemented
|
|
if isinstance(other, QueueFilter):
|
|
select = self.select | other.select
|
|
return QueueFilter(list(select))
|
|
return super().__or__(other)
|
|
|
|
# def __invert__(self) -> Filter:
|
|
# select = ALL_QUEUE_NAMES - self.select
|
|
# return QueueFilter(list(select))
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_parse_results(toks: pyparsing.ParseResults) -> QueueFilter:
|
|
return QueueFilter([QueueName(t) for t in toks])
|
|
|
|
|
|
class AddressSelector(enum.Enum):
|
|
# addresses of a mail to extract
|
|
SENDER = "from"
|
|
RECIPIENT = "to"
|
|
ANY = "address"
|
|
|
|
def extract(self, mail: Mail) -> typing.Generator[str, None, None]:
|
|
if self == AddressSelector.SENDER or self == AddressSelector.ANY:
|
|
yield mail.sender
|
|
if self == AddressSelector.RECIPIENT or self == AddressSelector.ANY:
|
|
for recp in mail.recipients:
|
|
yield recp.address
|
|
|
|
|
|
class BaseAddressPattern(abc.ABC):
|
|
__slots__ = ()
|
|
|
|
# abstract base class of patterns to use to match a mail address
|
|
# subclasses match either: full address, domain part, regex
|
|
def __init__(self) -> None:
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
def __repr__(self) -> str:
|
|
raise NotImplementedError()
|
|
|
|
@abc.abstractmethod
|
|
def matches(self, address: str) -> bool:
|
|
raise NotImplementedError()
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_unquoted_parse_results(toks: pyparsing.ParseResults) -> list[BaseAddressPattern]:
|
|
addresses: str = toks[0]
|
|
patterns: list[BaseAddressPattern] = []
|
|
for address in addresses.strip().split(','):
|
|
address = address.strip()
|
|
if not address:
|
|
continue
|
|
if address.startswith('~'):
|
|
patterns.append(AddressRegexMatch(address[1:]))
|
|
elif address.startswith('@'):
|
|
patterns.append(AddressDomainPattern(address[1:]))
|
|
else:
|
|
patterns.append(AddressPattern(address))
|
|
return patterns
|
|
|
|
|
|
class AddressPattern(BaseAddressPattern):
|
|
__slots__ = ('address',)
|
|
|
|
# match full address exactly
|
|
def __init__(self, address: str) -> None:
|
|
super().__init__()
|
|
self.address = address.lower().removesuffix('.')
|
|
|
|
def matches(self, address: str) -> bool:
|
|
return address.lower().removesuffix('.') == self.address
|
|
|
|
def __repr__(self) -> str:
|
|
return f'{self.address!r}'
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_parse_results(toks: pyparsing.ParseResults) -> BaseAddressPattern:
|
|
address: str = toks[0]
|
|
if address.startswith('@'):
|
|
return AddressDomainPattern(address[1:])
|
|
else:
|
|
return AddressPattern(address)
|
|
|
|
|
|
class AddressDomainPattern(BaseAddressPattern):
|
|
__slots__ = ('domain',)
|
|
|
|
# match address by domain
|
|
def __init__(self, domain: str) -> None:
|
|
super().__init__()
|
|
self.domain = domain.lower().removesuffix('.')
|
|
|
|
def matches(self, address: str) -> bool:
|
|
parts = address.rsplit('@', maxsplit=1)
|
|
if len(parts) == 1:
|
|
# "MAILER-DAEMON"
|
|
return False
|
|
return parts[1].lower().removesuffix('.') == self.domain
|
|
|
|
def __repr__(self) -> str:
|
|
return f'{"@" + self.domain!r}'
|
|
|
|
|
|
class AddressRegexMatch(BaseAddressPattern):
|
|
__slots__ = ('address',)
|
|
|
|
# match address by regex
|
|
def __init__(self, address: str | re.Pattern[str]) -> None:
|
|
super().__init__()
|
|
if isinstance(address, re.Pattern):
|
|
self.address = address
|
|
else:
|
|
self.address = re.compile(address)
|
|
|
|
def matches(self, address: str) -> bool:
|
|
return bool(self.address.search(address.lower().removesuffix('.')))
|
|
|
|
def __repr__(self) -> str:
|
|
return f'~{self.address.pattern!r}'
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_parse_results(toks: pyparsing.ParseResults) -> AddressRegexMatch:
|
|
return AddressRegexMatch(toks[0])
|
|
|
|
|
|
class AddressFilter(SingleExprFilter):
|
|
__slots__ = ('selector', 'patterns')
|
|
|
|
# match mails by address
|
|
def __init__(self, selector: AddressSelector, patterns: list[BaseAddressPattern]) -> None:
|
|
self.selector = selector
|
|
self.patterns = patterns
|
|
|
|
def matches(self, mail: Mail) -> bool:
|
|
for address in self.selector.extract(mail):
|
|
for pattern in self.patterns:
|
|
if pattern.matches(address):
|
|
return True
|
|
return False
|
|
|
|
def possible_queues(self) -> set[QueueName]:
|
|
return ALL_QUEUE_NAMES
|
|
|
|
def __repr__(self) -> str:
|
|
if not self.patterns:
|
|
return '0'
|
|
return f'{self.selector.value} {" ".join(repr(p) for p in self.patterns)}'
|
|
|
|
@staticmethod
|
|
@parser_action
|
|
def from_parse_results(toks: pyparsing.ParseResults) -> AddressFilter:
|
|
return AddressFilter(
|
|
selector=AddressSelector(toks[0]),
|
|
patterns=list(toks[1]),
|
|
)
|
|
|
|
|
|
# create parser for filter expressions
|
|
def build_filter_parser() -> typing.Callable[[str], Filter]:
|
|
addr_expr_selector = pyparsing.MatchFirst((
|
|
pyparsing.Keyword(sel.value)
|
|
for sel in AddressSelector
|
|
))
|
|
squoted = pyparsing.QuotedString(quoteChar="'", escChar='\\', unquoteResults=True)
|
|
dquoted = pyparsing.QuotedString(quoteChar='"', escChar='\\', unquoteResults=True)
|
|
quoted = squoted | dquoted
|
|
addr_expr_pattern_q_re = (pyparsing.Suppress('~') + quoted).setParseAction(AddressRegexMatch.from_parse_results)
|
|
addr_expr_pattern_q = quoted.copy().setParseAction(AddressPattern.from_parse_results)
|
|
addr_expr_pattern = pyparsing.Word(pyparsing.printables, excludeChars="'\"") \
|
|
.setParseAction(BaseAddressPattern.from_unquoted_parse_results)
|
|
addr_expr_patterns = pyparsing.MatchFirst([
|
|
pyparsing.OneOrMore(addr_expr_pattern_q_re | addr_expr_pattern_q),
|
|
addr_expr_pattern,
|
|
])
|
|
addr_expr = (
|
|
addr_expr_selector + pyparsing.Optional(pyparsing.Suppress('=')) + pyparsing.Group(addr_expr_patterns)
|
|
).setParseAction(AddressFilter.from_parse_results)
|
|
queue_expr_name = pyparsing.MatchFirst([pyparsing.Keyword(qn.value) for qn in QueueName])
|
|
queue_expr_single = queue_expr_name
|
|
queue_expr_mult = (
|
|
pyparsing.Suppress(pyparsing.Keyword('queue'))
|
|
+ queue_expr_name
|
|
+ pyparsing.ZeroOrMore(pyparsing.Suppress(',') + queue_expr_name)
|
|
)
|
|
queue_expr = (queue_expr_single | queue_expr_mult).setParseAction(QueueFilter.from_parse_results)
|
|
const_true = pyparsing.Keyword('1').setParseAction(lambda x: TRUE_FILTER)
|
|
const_false = pyparsing.Keyword('0').setParseAction(lambda x: FALSE_FILTER)
|
|
base_expr = addr_expr | queue_expr | const_true | const_false
|
|
parser = pyparsing.infixNotation(base_expr, [
|
|
('not', 1, pyparsing.opAssoc.RIGHT, NotFilter.from_parse_results),
|
|
('or', 2, pyparsing.opAssoc.LEFT, OrFilter.from_parse_results),
|
|
('and', 2, pyparsing.opAssoc.LEFT, AndFilter.from_parse_results),
|
|
])
|
|
|
|
def parse(input: str) -> Filter:
|
|
return parser.parseString(input, parseAll=True)[0]
|
|
|
|
return parse
|
|
|
|
|
|
FILTER_PARSER = build_filter_parser()
|
|
|
|
|
|
# TODO: proper type hinting for decorator builder?
|
|
# commands are methods in the CLI class and need to be decorated with this.
|
|
# the name defaults to the name of the function.
|
|
def register_command(*, name: str | tuple[()] = ()):
|
|
def register(cmd: typing.Callable) -> typing.Callable:
|
|
setattr(cmd, '__command', name)
|
|
return cmd
|
|
|
|
return register
|
|
|
|
|
|
class CLI:
|
|
def __init__(self, source: Source) -> None:
|
|
self.source = source
|
|
self.current_filter: AndFilter = AndFilter()
|
|
self._simple_commands: dict[str, typing.Callable[[], typing.Awaitable[None]]] = {}
|
|
self._long_commands: dict[str, typing.Callable[[str], typing.Awaitable[None]]] = {}
|
|
for name in dir(self):
|
|
prop = getattr(self, name)
|
|
if callable(prop):
|
|
command = getattr(prop, '__command', None)
|
|
if command is None: continue
|
|
if command == ():
|
|
cmd_name = name
|
|
else:
|
|
assert isinstance(command, str)
|
|
cmd_name = command
|
|
sig = inspect.signature(prop)
|
|
simple = False
|
|
long = False
|
|
if len(sig.parameters) == 0:
|
|
simple = True
|
|
elif len(sig.parameters) == 1:
|
|
long = True
|
|
param = list(sig.parameters.values())[0]
|
|
if not param.default is inspect.Parameter.empty:
|
|
simple = True
|
|
else:
|
|
raise TypeError('Commands must take zero or one arguments')
|
|
if simple:
|
|
self._simple_commands[cmd_name] = prop
|
|
if long:
|
|
self._long_commands[cmd_name] = prop
|
|
self._known_commands = set(self._simple_commands) | set(self._long_commands)
|
|
self._init_readline()
|
|
self._completion_cache: list[str] = []
|
|
|
|
def _readline_completer(self, text: str, state: int) -> str | None:
|
|
text_lstripped = text.lstrip()
|
|
front_space = text[:len(text) - len(text_lstripped)]
|
|
text = text_lstripped
|
|
if len(text.split(maxsplit=1)) > 1:
|
|
# not in first word (command) anymore - no more completions
|
|
return None
|
|
if text.rstrip() != text:
|
|
# whitespace after first word (command) - no more completions
|
|
return None
|
|
if state == 0:
|
|
self._completion_cache = [
|
|
front_space + cmd
|
|
for cmd in self._known_commands
|
|
if cmd.startswith(text)
|
|
]
|
|
if state >= len(self._completion_cache):
|
|
return None
|
|
return self._completion_cache[state]
|
|
|
|
def _init_readline(self) -> None:
|
|
readline.read_init_file()
|
|
# completer
|
|
readline.parse_and_bind("tab: complete")
|
|
readline.set_completer(self._readline_completer)
|
|
readline.set_completer_delims("") # want to see full buffer in completer
|
|
# history handling:
|
|
self._history_file = os.path.join(os.path.expanduser("~"), ".pqm_history")
|
|
try:
|
|
readline.read_history_file(self._history_file)
|
|
except FileNotFoundError:
|
|
pass
|
|
previous_history_len = readline.get_current_history_length()
|
|
readline.set_auto_history(True)
|
|
readline.set_history_length(100)
|
|
|
|
def finish_readline() -> None:
|
|
new_entries = readline.get_current_history_length() - previous_history_len
|
|
if new_entries > 0:
|
|
try:
|
|
readline.append_history_file(new_entries, self._history_file)
|
|
except FileNotFoundError:
|
|
readline.write_history_file(self._history_file)
|
|
|
|
atexit.register(finish_readline)
|
|
|
|
@register_command()
|
|
async def help(self) -> None:
|
|
"""Show all available commands"""
|
|
all_cmds: list[tuple[str, str]] = []
|
|
for cmd, func1 in self._simple_commands.items():
|
|
if self._long_commands.get(cmd, None) is func1:
|
|
all_cmds.append((f'{cmd} [<arg>]', func1.__doc__ or 'No help available'))
|
|
else:
|
|
all_cmds.append((cmd, func1.__doc__ or 'No help available'))
|
|
for cmd, func2 in self._long_commands.items():
|
|
if self._simple_commands.get(cmd, None) is func2:
|
|
continue
|
|
all_cmds.append((f'{cmd} <arg>', func2.__doc__ or 'No help available'))
|
|
for cmd, doc in sorted(all_cmds):
|
|
doc = doc.strip().splitlines()[0]
|
|
print(f'{cmd:<30} {doc}', file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
|
|
@register_command(name="help")
|
|
async def help_command(self, args: str) -> None:
|
|
"""Show detailed help for a command"""
|
|
command = args.strip()
|
|
if not command in self._known_commands:
|
|
print(f'Command {command} not known, no help available', file=sys.stderr)
|
|
return
|
|
simple_command = self._simple_commands.get(command, None)
|
|
long_command = self._long_commands.get(command, None)
|
|
if simple_command is long_command:
|
|
print(f'>>>> {command} [args]', file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
doc = inspect.cleandoc(simple_command.__doc__ or 'No help available')
|
|
print(doc, file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
return
|
|
if simple_command:
|
|
print(f'>>>> {command}', file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
doc = inspect.cleandoc(simple_command.__doc__ or 'No help available')
|
|
print(doc, file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
if long_command:
|
|
print(f'>>>> {command} <args>', file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
doc = inspect.cleandoc(long_command.__doc__ or 'No help available')
|
|
print(doc, file=sys.stderr)
|
|
print(file=sys.stderr)
|
|
|
|
@register_command()
|
|
async def exit(self) -> None:
|
|
"""Exit pqm (or press Ctrl-D on empty prompt)"""
|
|
# leave main loop
|
|
raise KeyboardInterrupt()
|
|
|
|
def prompt_if_empty_filter(self) -> bool:
|
|
if self.current_filter.expressions:
|
|
return True
|
|
print(
|
|
"You currently have no filters configured, which means this action will "
|
|
"potentially run for a lot of mails",
|
|
file=sys.stderr,
|
|
)
|
|
ack = input_ack("Continue anyway (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return False
|
|
return True
|
|
|
|
@register_command()
|
|
async def health(self) -> None:
|
|
"""
|
|
Show generic stats for queues and overly active address
|
|
|
|
If the current filter isn't empty show queue counts for both matching and all mails.
|
|
Only shows overly active addresses for mails matching the current filter (or all, if not set); never includes mails on hold.
|
|
"""
|
|
count_q_other = 0
|
|
count_q = {
|
|
QueueName.ACTIVE: 0,
|
|
QueueName.DEFERRED: 0,
|
|
QueueName.HOLD: 0,
|
|
}
|
|
total_q_other = 0
|
|
total_q = {
|
|
QueueName.ACTIVE: 0,
|
|
QueueName.DEFERRED: 0,
|
|
QueueName.HOLD: 0,
|
|
}
|
|
stats = UserActivityStats()
|
|
for item in await self.source.get_list():
|
|
if item.queue_name in total_q:
|
|
total_q[item.queue_name] += 1
|
|
else:
|
|
total_q_other += 1
|
|
if self.current_filter.matches(item):
|
|
if item.queue_name in count_q:
|
|
count_q[item.queue_name] += 1
|
|
else:
|
|
count_q_other += 1
|
|
stats.count(item)
|
|
stats.prepare()
|
|
servers = self.source.server_list()
|
|
print(f"Servers: {len(servers)} ({', '.join(servers)})")
|
|
if self.current_filter.expressions:
|
|
print(f"Active: {count_q[QueueName.ACTIVE]} (total: {total_q[QueueName.ACTIVE]})")
|
|
print(f"Deferred: {count_q[QueueName.DEFERRED]} (total: {total_q[QueueName.DEFERRED]})")
|
|
print(f"Hold: {count_q[QueueName.HOLD]} (total: {total_q[QueueName.HOLD]})")
|
|
if count_q_other:
|
|
print(f"Other: {count_q_other} (total: {total_q_other}")
|
|
else:
|
|
print(f"Active: {count_q[QueueName.ACTIVE]}")
|
|
print(f"Deferred: {count_q[QueueName.DEFERRED]}")
|
|
print(f"Hold: {count_q[QueueName.HOLD]}")
|
|
if count_q_other:
|
|
print(f"Other: {count_q_other}")
|
|
print()
|
|
print("Looking for users with more many open communications matching the current search criteria")
|
|
while True:
|
|
x = stats.extract_highest_user()
|
|
if not x: break
|
|
user, activity, mails = x
|
|
print("User {} has {} open communications".format(user, activity))
|
|
|
|
QUEUE_FLAGS = {
|
|
QueueName.HOLD: '!',
|
|
QueueName.ACTIVE: '*',
|
|
QueueName.CORRUPT: '?',
|
|
QueueName.INCOMING: '$',
|
|
QueueName.MAILDROP: '~',
|
|
QueueName.DEFERRED: ' ',
|
|
}
|
|
|
|
async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None:
|
|
flt: AndFilter
|
|
if args:
|
|
# additional restrictions
|
|
try:
|
|
parsed_flt = FILTER_PARSER(args)
|
|
except pyparsing.ParseException as e:
|
|
print(pyparsing.ParseException.explain(e, 0))
|
|
# print(e.explain(depth=0), file=sys.stderr)
|
|
return
|
|
flt = self.current_filter & parsed_flt
|
|
else:
|
|
flt = self.current_filter
|
|
show_filter: Filter
|
|
if not all and not flt.expressions:
|
|
# by default hide active and hold, unless -all variant is used or specific filters are used
|
|
show_filter = NotFilter(QueueFilter([QueueName.ACTIVE, QueueName.HOLD]))
|
|
else:
|
|
show_filter = flt
|
|
for item in show_filter.filter(await self.source.get_list()):
|
|
item.print(verbose=verbose)
|
|
|
|
async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]:
|
|
mails = {
|
|
mail.queue_id: mail
|
|
for mail in await self.source.get_list()
|
|
}
|
|
missing = []
|
|
verified_list = []
|
|
for queue_id in given_ids:
|
|
item = mails.get(queue_id, None)
|
|
if item:
|
|
item.print(verbose=False)
|
|
verified_list.append(item)
|
|
else:
|
|
missing.append(queue_id)
|
|
if missing:
|
|
print(f"Couldn't find mails with the following IDs: {' '.join(missing)}", file=sys.stderr)
|
|
return verified_list
|
|
|
|
@register_command()
|
|
async def list(self, args: str = '') -> None:
|
|
"""
|
|
List all mails (single line per mail); if no filter is configured hide active and hold queue
|
|
|
|
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
|
|
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
|
|
"""
|
|
await self.list_impl(args=args, verbose=False, all=False)
|
|
|
|
@register_command(name="list-verbose")
|
|
async def list_verbose(self, args: str = '') -> None:
|
|
"""
|
|
List all mails (verbose output); if no filter is configured hide active and hold queue
|
|
|
|
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
|
|
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
|
|
"""
|
|
await self.list_impl(args=args, verbose=True, all=False)
|
|
|
|
@register_command(name="list-all")
|
|
async def list_all(self, args: str = '') -> None:
|
|
"""
|
|
List all mails (including active + hold queue, single line per mail)
|
|
|
|
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
|
|
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
|
|
"""
|
|
await self.list_impl(args=args, verbose=False, all=True)
|
|
|
|
@register_command(name="list-verbose-all")
|
|
async def list_verbose_all(self, args: str = '') -> None:
|
|
"""
|
|
List all mails (including active + hold queue, verbose output)
|
|
|
|
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
|
|
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
|
|
"""
|
|
await self.list_impl(args=args, verbose=True, all=True)
|
|
|
|
@register_command(name="select")
|
|
async def select(self, args: str) -> None:
|
|
"""
|
|
Add filter for mails that other commands work on
|
|
|
|
Filter syntax:
|
|
- `from bob@example.com`, `from alice@example.com,bob@example.com,@example.net`, `from "alice@example.com" "bob@example.com"`
|
|
Multiple address can be given to match any of them; either unquoted and comma separated, or quoted and whitespace
|
|
separated. In the unquoted case the (comma separated) pattern must not include any whitespace.
|
|
An address starting with `@` only checks the domain name and ignores the local part.
|
|
- `from ~regex.*@example.(com|net)`, `from ~"regex.*@example.(com|net)" "other@example.com`
|
|
To use regular expressions either put `~` in front of an unquoted pattern (need to repeat for each regex in a
|
|
comma separated pattern list) or before the quotes of a quoted pattern.
|
|
- `to ...` and `address ...` (matching both to and from) work the same as `from ...`
|
|
- `queue hold,deferred`
|
|
Comma separated list of queues a mail must be in. For single queue names `queue` can be omitted, e.g. `hold`.
|
|
- `$expr1 and $expr2`
|
|
- `$expr1 or $expr2` (`a and b or c` is parsed as `a and (b or c)`)
|
|
- `($expr)`
|
|
- `not $expr`
|
|
"""
|
|
try:
|
|
flt = FILTER_PARSER(args)
|
|
except pyparsing.ParseException as e:
|
|
print(pyparsing.ParseException.explain(e, 0))
|
|
# print(e.explain(depth=0), file=sys.stderr)
|
|
return
|
|
self.current_filter.expressions.append(flt)
|
|
|
|
@register_command()
|
|
async def clear(self) -> None:
|
|
"""Clear all filters"""
|
|
self.current_filter.expressions = []
|
|
|
|
@register_command(name="clear")
|
|
async def clear_ndx(self, args: str) -> None:
|
|
"""Clear filter with given index (zero based; first filter is at index 0)"""
|
|
ndx = int(args)
|
|
self.current_filter.expressions.pop(ndx)
|
|
|
|
@register_command(name="pop")
|
|
async def pop(self) -> None:
|
|
"""Remove last filter"""
|
|
if self.current_filter.expressions:
|
|
self.current_filter.expressions.pop()
|
|
else:
|
|
print("No filters configured, nothing to pop", file=sys.stderr)
|
|
|
|
@register_command()
|
|
async def current(self) -> None:
|
|
"""Show current filters"""
|
|
for e in self.current_filter.expressions:
|
|
print(f'select {e}', file=sys.stderr)
|
|
|
|
_SHOW_ARGS_PARSER = argparse.ArgumentParser('show', description="Show mails", add_help=False)
|
|
_SHOW_ARGS_PARSER.add_argument('-b', dest='body', action='store_true', help="Show body content")
|
|
_SHOW_ARGS_PARSER.add_argument('-e', dest='envelope', action='store_true', help="Show message envelope content.")
|
|
_SHOW_ARGS_PARSER.add_argument('-h', dest='header', action='store_true', help="Show message header content.")
|
|
_SHOW_ARGS_PARSER.add_argument('ID', nargs='+')
|
|
|
|
@register_command()
|
|
async def show(self, args: str) -> None:
|
|
ns_args = self._SHOW_ARGS_PARSER.parse_args(shlex.split(args))
|
|
if ns_args.body or ns_args.envelope or ns_args.header:
|
|
flags = []
|
|
if ns_args.body: flags += ['-b']
|
|
if ns_args.envelope: flags += ['-e']
|
|
if ns_args.header: flags += ['-h']
|
|
else:
|
|
# default to header only for now
|
|
flags = ['-h']
|
|
|
|
head = 16 * "-"
|
|
|
|
async def print_mail(result: tuple[str, bytes, bytes]) -> None:
|
|
msg, mail, stderr = result
|
|
print(f'{head} {msg:^16} {head}')
|
|
sys.stdout.flush()
|
|
if stderr:
|
|
sys.stderr.buffer.write(stderr)
|
|
sys.stderr.flush()
|
|
sys.stdout.buffer.write(mail)
|
|
sys.stdout.flush()
|
|
|
|
async def get_mail(msg: str) -> tuple[str, bytes, bytes]:
|
|
mail, err = await self.source.get_mail(msg, flags=flags)
|
|
if err and not err.endswith(b'\n'):
|
|
err += b'\n'
|
|
mail = mail or b''
|
|
if mail and not mail.endswith(b'\n'):
|
|
mail += b'\n'
|
|
return (msg, mail, err)
|
|
|
|
async with trio_parallel_ordered(print_mail) as tpo:
|
|
for msg in ns_args.ID:
|
|
await tpo.start(get_mail, msg)
|
|
|
|
print(head + 18*"-" + head)
|
|
show.__doc__ = _SHOW_ARGS_PARSER.format_help()
|
|
|
|
@register_command(name="flush-all")
|
|
async def flush_all(self) -> None:
|
|
"""
|
|
Flush the queue: attempt to deliver all queued mail.
|
|
|
|
Ignores the current filters (but prompts if you have filters).
|
|
"""
|
|
if self.current_filter.expressions:
|
|
print(
|
|
"You have currently configured filters, but this command will flush all mails, whether they match the filter or not.\n"
|
|
"You might want to use the `flush` command instead (or clear the filters before)",
|
|
file=sys.stderr,
|
|
)
|
|
ack = input_ack("Continue anyway (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
await self.source.flush_all()
|
|
|
|
@register_command()
|
|
async def flush(self) -> None:
|
|
"""Flush (deferred) selected mails in the queue: attempt to deliver them"""
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
flt = QueueFilter([QueueName.DEFERRED]) & self.current_filter
|
|
await self.source.flush(flt.filter_ids(await self.source.get_list()))
|
|
|
|
@register_command(name="flush")
|
|
async def flush_args(self, args: str) -> None:
|
|
"""Flush (deferred) mails with given IDs: attempt to deliver them"""
|
|
await self.source.flush(args.strip().split())
|
|
|
|
async def delete_impl(self, flt: Filter) -> None:
|
|
mails = list(flt.filter(await self.source.get_list()))
|
|
for item in mails:
|
|
item.print(verbose=False)
|
|
ack = input_ack("Really delete those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
queues = flt.possible_queues() # restrict to selected queues to prevent accidents
|
|
await self.source.delete((mail.queue_id for mail in mails), from_queues=queues)
|
|
|
|
@register_command()
|
|
async def delete(self) -> None:
|
|
"""Delete selected mails from the hold queue"""
|
|
if not self.current_filter.expressions:
|
|
print("Command not allowed without filter")
|
|
return
|
|
# restrict to hold queue
|
|
flt = QueueFilter([QueueName.HOLD]) & self.current_filter
|
|
await self.delete_impl(flt)
|
|
|
|
@register_command(name="delete-all")
|
|
async def delete_all(self) -> None:
|
|
"""Delete selected mails from all queues"""
|
|
if not self.current_filter.expressions:
|
|
print("Command not allowed without filter")
|
|
return
|
|
await self.delete_impl(self.current_filter)
|
|
|
|
@register_command(name="delete")
|
|
async def delete_args(self, args: str) -> None:
|
|
"""Delete mails with given IDs from the queues"""
|
|
delete_list = [mail.queue_id for mail in await self._print_mails_with_ids_for_ack(args.strip().split())]
|
|
ack = input_ack("Really delete those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
await self.source.delete(delete_list)
|
|
|
|
@register_command()
|
|
async def hold(self) -> None:
|
|
"""Put selected mails on hold"""
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
# basically: skip mails that are already on hold
|
|
restrict_from_queues = [QueueName.INCOMING, QueueName.ACTIVE, QueueName.DEFERRED]
|
|
flt = QueueFilter(restrict_from_queues) & self.current_filter
|
|
queues = flt.possible_queues()
|
|
await self.source.hold(
|
|
flt.filter_ids(await self.source.get_list()),
|
|
from_queues=queues,
|
|
)
|
|
|
|
@register_command(name="hold")
|
|
async def hold_args(self, args: str) -> None:
|
|
"""Put mails with given IDs on hold"""
|
|
await self.source.hold(
|
|
args.strip().split(),
|
|
)
|
|
|
|
@register_command()
|
|
async def release(self) -> None:
|
|
"""Release mail that was put "on hold"."""
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
flt = QueueFilter([QueueName.HOLD]) & self.current_filter
|
|
queues = flt.possible_queues()
|
|
await self.source.release(
|
|
flt.filter_ids(await self.source.get_list()),
|
|
from_queues=queues,
|
|
)
|
|
|
|
@register_command(name="release")
|
|
async def release_args(self, args: str) -> None:
|
|
"""Release mails with given IDs that were put "on hold"."""
|
|
await self.source.release(
|
|
args.strip().split(),
|
|
)
|
|
|
|
@register_command()
|
|
async def requeue(self) -> None:
|
|
"""Requeue mail (move to "maildrop"; get reprocessed)"""
|
|
ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
flt = self.current_filter
|
|
queues = flt.possible_queues()
|
|
await self.source.requeue(
|
|
flt.filter_ids(await self.source.get_list()),
|
|
from_queues=queues,
|
|
)
|
|
|
|
@register_command(name="requeue")
|
|
async def requeue_args(self, args: str) -> None:
|
|
"""Requeue mail with given IDs (move to "maildrop"; get reprocessed)"""
|
|
ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
await self.source.requeue(
|
|
args.strip().split(),
|
|
)
|
|
|
|
@register_command()
|
|
async def expire(self) -> None:
|
|
"""Expire mails and flush them (return error to sender)."""
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
flt = self.current_filter
|
|
mails = list(flt.filter(await self.source.get_list()))
|
|
for item in mails:
|
|
item.print(verbose=False)
|
|
ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
queues = flt.possible_queues()
|
|
await self.source.expire((mail.queue_id for mail in mails), from_queues=queues)
|
|
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED))
|
|
|
|
@register_command(name="expire")
|
|
async def expire_args(self, args: str) -> None:
|
|
"""Expire mails (return error to sender) with given IDs and flush them."""
|
|
mails = await self._print_mails_with_ids_for_ack(args.strip().split())
|
|
ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
await self.source.expire((mail.queue_id for mail in mails))
|
|
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED))
|
|
|
|
@register_command(name="expire-release")
|
|
async def expire_release(self) -> None:
|
|
"""Expire mails and flush them (return error to sender); release if mails are on hold."""
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
flt = self.current_filter
|
|
mails = list(flt.filter(await self.source.get_list()))
|
|
for item in mails:
|
|
item.print(verbose=False)
|
|
ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
queues = flt.possible_queues()
|
|
await self.source.expire_release((mail.queue_id for mail in mails), from_queues=queues)
|
|
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD)))
|
|
|
|
@register_command(name="expire-release")
|
|
async def expire_release_args(self, args: str) -> None:
|
|
"""Expire mails (return error to sender) with given IDs and flush them; release if mails are on hold."""
|
|
mails = await self._print_mails_with_ids_for_ack(args.strip().split())
|
|
ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
await self.source.expire_release((mail.queue_id for mail in mails))
|
|
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD)))
|
|
|
|
@register_command(name="expire-noflush")
|
|
async def expire_noflush(self) -> None:
|
|
"""Expire mails (return error to sender on next delivery attempt)."""
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
flt = self.current_filter
|
|
mails = list(flt.filter(await self.source.get_list()))
|
|
for item in mails:
|
|
item.print(verbose=False)
|
|
ack = input_ack("Really expire those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
queues = flt.possible_queues()
|
|
await self.source.expire((mail.queue_id for mail in mails), from_queues=queues)
|
|
|
|
@register_command(name="expire-noflush")
|
|
async def expire_noflush_args(self, args: str) -> None:
|
|
"""Expire mails (return error to sender on next delivery attempt) with given IDs."""
|
|
mails = await self._print_mails_with_ids_for_ack(args.strip().split())
|
|
ack = input_ack("Really expire those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
await self.source.expire((mail.queue_id for mail in mails))
|
|
|
|
@register_command(name="forward-to")
|
|
async def forward_to(self, args: str) -> None:
|
|
"""
|
|
Forward mails to given recipients.
|
|
|
|
Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice <alice.example.com>`).
|
|
"""
|
|
if not args:
|
|
print("No recipients given")
|
|
return
|
|
if not self.prompt_if_empty_filter():
|
|
return
|
|
mails = list(self.current_filter.filter(await self.source.get_list()))
|
|
for item in mails:
|
|
item.print(verbose=False)
|
|
ack = input_ack("Really forward those mails (y/N)? ")
|
|
if ack.lower()[:1] != "y":
|
|
return
|
|
await self.source.forward_to(mails=mails, recipients=args.split())
|
|
|
|
async def prompt(self) -> None:
|
|
# main loop of CLI
|
|
while True:
|
|
try:
|
|
full_line = input('# ')
|
|
except KeyboardInterrupt: # ctrl-c
|
|
print('^C', file=sys.stderr)
|
|
continue
|
|
except EOFError: # ctrl-d
|
|
print() # newline after prompt
|
|
return
|
|
stripped_line = full_line.strip()
|
|
if full_line.startswith(' '):
|
|
# don't add space prefixed lines to history
|
|
readline.remove_history_item(readline.get_current_history_length() - 1)
|
|
elif stripped_line != full_line:
|
|
# don't store trailing spaces
|
|
readline.replace_history_item(readline.get_current_history_length() - 1, stripped_line)
|
|
parts = stripped_line.split(maxsplit=1)
|
|
if not parts:
|
|
continue
|
|
command = parts[0]
|
|
has_arg = len(parts) > 1
|
|
try:
|
|
if has_arg and command in self._long_commands:
|
|
await self._long_commands[command](parts[1])
|
|
continue
|
|
elif not has_arg and command in self._simple_commands:
|
|
await self._simple_commands[command]()
|
|
continue
|
|
except (Exception, KeyboardInterrupt):
|
|
# TODO: skip first frame (only pointing to line above)
|
|
traceback.print_exc()
|
|
continue
|
|
# unknown command / invalid invocation
|
|
if command in self._simple_commands:
|
|
print(f"Command {command} doesn't take any arguments", file=sys.stderr)
|
|
elif command in self._long_commands:
|
|
print(f"Command {command} requires arguments", file=sys.stderr)
|
|
else:
|
|
print(f"Unknown command: {command}", file=sys.stderr)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="postfix (cluster) queue manager")
|
|
parser.add_argument('--config', '-c', type=pathlib.Path, help="Path to config file (default: try ~/.config/pqm.yaml, /etc/pqm.yaml)")
|
|
args = parser.parse_args()
|
|
config = Config.load(args.config or '')
|
|
cli = CLI(source=config.source())
|
|
|
|
try:
|
|
trio.run(cli.prompt)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
main()
|