pqm/pqm

1976 lines
75 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright © Universität Stuttgart
# LICENSE: MIT
from __future__ import annotations
import abc
import argparse
import atexit
import contextlib
import dataclasses
import datetime
import enum
import functools
import inspect
import io
import json
import os
import os.path
import pathlib
import shlex
import pyparsing
import re
import readline
import socket
import subprocess
import sys
import traceback
import trio
import typing
import uuid
import yaml
T = typing.TypeVar('T')
@dataclasses.dataclass(slots=True)
class Config:
DEFAULT_FORWARD_NOTE: str = """\
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.
The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
"""
# if not remote_sources are configured -> use local queue
remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
forward_note: str = DEFAULT_FORWARD_NOTE
forward_sender: str = 'MAILER-DAEMON'
forward_sender_name: str = 'Mail Delivery System'
@staticmethod
def load(filename: str = '') -> Config:
config = Config()
if not filename:
default_paths = [
os.path.expanduser('~/.config/pqm.yaml'),
'/etc/pqm.yaml',
]
for path in default_paths:
if os.path.exists(path):
filename = path
break
else:
# no configfile -> default config
return config
with open(filename) as file:
data = yaml.safe_load(file)
assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"
rs_list = data.pop('remote-sources', [])
assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
for entry in rs_list:
assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
if ':' in entry:
alias, target = entry.split(':', maxsplit=1)
else:
target = entry
alias = entry.split('.', maxsplit=1)[0]
assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
if alias in config.remote_sources:
raise ValueError(
f'Duplicate alias {alias!r} in remote-sources'
f' (have: {config.remote_sources[alias]!r}, new: {target!r}',
)
config.remote_sources[alias] = target
config.forward_note = data.pop('forward-note', config.forward_note)
config.forward_sender = data.pop('forward-sender', config.forward_sender)
config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name)
assert not data, f"Unknown config options: {data}"
return config
def forward_build_from(self, *, hostname: str) -> str:
if '@' in self.forward_sender:
sender = self.forward_sender
else:
sender = f'{self.forward_sender}@{hostname}'
return f'{self.forward_sender_name} <{sender}>'
def source(self) -> Source:
if self.remote_sources:
return RemoteSource(config=self, remotes=self.remote_sources)
else:
return LocalSource(config=self)
def input_ack(prompt: str) -> str:
# don't want readline for simple acks (and input always uses readline)
sys.stdout.write(prompt)
sys.stdout.flush()
return sys.stdin.readline(16)
def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.Callable[[pyparsing.ParseResults], T]:
# treat any exception thrown within a custom parser action as `ParseFatalException`,
# (stops parsing immediately)
@functools.wraps(act)
def wrapped(tokens: pyparsing.ParseResults):
try:
return act(tokens)
except Exception as e:
raise pyparsing.ParseFatalException(str(e))
return wrapped
def format_mail_forward(
*,
config: Config,
mails: dict[str, tuple[Mail, bytes | None]],
hostname: str,
recipients: typing.Iterable[str],
) -> bytes:
bin_buf = io.BytesIO()
buf = io.TextIOWrapper(bin_buf, write_through=True)
buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n')
for recp in recipients:
buf.write(f'To: <{recp}>\n')
buf.write(f"Subject: Deferred mails on {hostname}\n")
buf.write("MIME-Version: 1.0\n")
boundary = f"postmaster-forward-{uuid.uuid4().hex}"
buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n")
buf.write("\n")
buf.write("This is a message with multiple parts in MIME format.\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("\n")
buf.write(config.forward_note)
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n")
buf.write("\n")
for queue_id, (mail, _) in mails.items():
mail.print(verbose=True, out=buf)
for queue_id, (_, mail_message) in mails.items():
basename = queue_id.replace('/', '_')
if mail_message is None:
# this should be very unlikely unless you try to forward messages from the active queue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: text/plain\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n")
buf.write("\n")
buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.")
continue
buf.write(f"--{boundary}\n")
buf.write("Content-Type: message/rfc822\n")
buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n")
buf.write("\n")
bin_buf.write(mail_message)
# ensure parts are terminated by newline
if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
buf.write("\n")
buf.write(f"--{boundary}--\n")
return bin_buf.getvalue()
class TrioParallelOrdered(typing.Generic[T]):
# used by trio_parallel_ordered below
def __init__(self, nursery: trio.Nursery, result_handler: typing.Callable[[T], typing.Awaitable[None]]) -> None:
# basic idea: for each task create a oneshot channel (capacity 1) to deliver result;
# put receiving channel into self._tx (-> ordered by task start time)
# in _handle_results: get result channel (still ordered), then wait for result
self.nursery = nursery
self.result_handler = result_handler
self.limit = trio.CapacityLimiter(8)
send_channel: trio.MemorySendChannel[trio.MemoryReceiveChannel[T]]
receive_channel: trio.MemoryReceiveChannel[trio.MemoryReceiveChannel[T]]
(send_channel, receive_channel) = trio.open_memory_channel(128)
self._tx = send_channel
async def _handle_results() -> None:
item: trio.MemoryReceiveChannel[T]
async for item in receive_channel:
try:
result = await item.receive()
except trio.EndOfChannel:
continue
await self.result_handler(result)
self.nursery.start_soon(_handle_results)
async def close(self) -> None:
await self._tx.aclose()
async def _spawn(self, job, args, kwargs, *, task_status=trio.TASK_STATUS_IGNORED) -> None:
tx: trio.MemorySendChannel[T]
rx: trio.MemoryReceiveChannel[T]
(tx, rx) = trio.open_memory_channel(1)
await self._tx.send(rx)
async with self.limit:
task_status.started()
async with tx:
result = await job(*args, **kwargs)
await tx.send(result)
async def start(self, job, *args, **kwargs) -> None:
await self.nursery.start(self._spawn, job, args, kwargs)
# async def handle_result_of_some_job_function(result) -> None: ...
# async def some_job_function(foo) -> ResultType: ...
# async with trio_parallel_ordered(handle_result_of_some_job_function) as tpo:
# for foo in biglist:
# await tpo.start(some_job_function, foo)
@contextlib.asynccontextmanager
async def trio_parallel_ordered(
result_handler: typing.Callable[[T], typing.Awaitable[None]],
) -> typing.AsyncGenerator[TrioParallelOrdered[T], None]:
async with trio.open_nursery() as nursery:
tpo = TrioParallelOrdered(nursery=nursery, result_handler=result_handler)
yield tpo
await tpo.close()
@dataclasses.dataclass(slots=True)
class Recipient:
"""Recipient in a postfix mail"""
address: str
# "error message" from last delivery attempt
delay_reason: str
@staticmethod
def from_json(d: dict) -> Recipient:
return Recipient(
address=d['address'],
delay_reason=d.get('delay_reason', ''),
)
class QueueName(enum.Enum):
MAILDROP = "maildrop"
HOLD = "hold"
INCOMING = "incoming"
ACTIVE = "active"
DEFERRED = "deferred"
CORRUPT = "corrupt"
ALL_QUEUE_NAMES: set[QueueName] = set(QueueName)
def json_decode_stream(data: str):
decoder = json.JSONDecoder()
data_len = len(data)
data_pos = 0
while data_len > data_pos and data[data_len-1].isspace():
data_len -= 1
while True:
while data_pos < data_len and data[data_pos].isspace():
data_pos += 1
if data_pos >= data_len:
return
obj, data_pos = decoder.raw_decode(data, data_pos)
yield obj
@dataclasses.dataclass(slots=True)
class Mail:
"""Metadata for mail in postfix queue"""
queue_name: QueueName
queue_id: str
arrival_time: datetime.datetime
message_size: int
sender: str
recipients: list[Recipient]
# forced_expire: bool
@staticmethod
def from_json(d: dict) -> Mail:
return Mail(
queue_name=QueueName(d['queue_name']),
queue_id=d['queue_id'],
arrival_time=datetime.datetime.fromtimestamp(d['arrival_time']),
message_size=d['message_size'],
sender=d['sender'],
recipients=[Recipient.from_json(recp) for recp in d['recipients']],
)
@staticmethod
def read_postqueue_json(data: str, id_prefix: str = '') -> list[Mail]:
queue = []
for obj in json_decode_stream(data):
mail = Mail.from_json(obj)
mail.queue_id = id_prefix + mail.queue_id
queue.append(mail)
return queue
def print(self, *, verbose: bool, out: typing.TextIO = sys.stdout) -> None:
flag = CLI.QUEUE_FLAGS.get(self.queue_name, ' ')
if verbose:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} {self.sender:<60s}",
file=out,
)
if not self.recipients:
print(f"{'':21}No recipients listed for this mail?", file=out)
for recpt in self.recipients:
print(f"{'':21}{recpt.address}", file=out)
if recpt.delay_reason:
print(f"{'':29}{recpt.delay_reason}", file=out)
else:
cnt_recpts = len(self.recipients)
if cnt_recpts:
last_recpt = self.recipients[-1].address
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})",
file=out,
)
else:
print(
f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
f"{self.sender:<60s} (No recipients listed for this mail?)",
file=out,
)
# abstract collection/cluster of postfix nodes (or just a single one)
class Source(abc.ABC):
__slots__ = ('config',)
def __init__(self, *, config: Config) -> None:
self.config = config
# list of server names in collection
@abc.abstractmethod
def server_list(self) -> list[str]:
raise NotImplementedError()
# split "list" of input elements into chunks
@typing.final
@staticmethod
def chunks(data: typing.Iterable[T], *, chunk_size: int = 512) -> typing.Generator[list[T], None, None]:
if isinstance(data, list):
while data:
yield data[:chunk_size]
data = data[chunk_size:]
else:
chunk = []
for item in data:
chunk.append(item)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
# retrieve list of mails in all queues
@abc.abstractmethod
async def get_list(self) -> list[Mail]:
raise NotImplementedError()
# basically postcat, and flags are used as command line arguments to postcast
# returns stdout and stderr of postcat
@abc.abstractmethod
async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
raise NotImplementedError()
@abc.abstractmethod
async def flush_all(self) -> None:
"""Flush the queue: attempt to deliver all queued mail."""
# postqueue -f
raise NotImplementedError()
@abc.abstractmethod
async def flush(self, queue_ids: typing.Iterable[str]) -> None:
"""Schedule immediate delivery of deferred mail with the specified queue IDs."""
# postqueue -i MSG1 -i MSG2 -i ...
raise NotImplementedError()
# abstract way to call `postsuper`, action is the command line flag for the action to take
@abc.abstractmethod
async def _postsuper(
self,
action: str,
queue_ids: typing.Iterable[str],
*,
from_queues: set[QueueName] | tuple[()] = (),
):
raise NotImplementedError()
async def delete(self, queue_ids: typing.Iterable[str], *, from_queues: set[QueueName] | tuple[()] = ()) -> None:
"""
Delete one message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
"""
await self._postsuper('-d', queue_ids=queue_ids, from_queues=from_queues)
async def hold(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
"""
Put mail "on hold" so that no attempt is made to deliver it.
Move one message with the named queue ID from the named mail queue(s) (default: incoming, active and deferred) to the hold queue.
"""
await self._postsuper('-h', queue_ids=queue_ids, from_queues=from_queues)
async def release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
"""
Release mail that was put "on hold".
Move one message with the named queue ID from the named mail queue(s) (default: hold) to the deferred queue.
"""
await self._postsuper('-H', queue_ids=queue_ids, from_queues=from_queues)
async def requeue(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
"""
Requeue the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
"""
await self._postsuper('-r', queue_ids=queue_ids, from_queues=from_queues)
async def expire(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
"""
Expire the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
"""
await self._postsuper('-e', queue_ids=queue_ids, from_queues=from_queues)
async def expire_release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
"""
Expire and release (if in hold) the message with the named queue ID from the named mail queue(s)
(default queues: hold, incoming, active and deferred).
"""
await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)
@abc.abstractmethod
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
raise NotImplementedError()
class LocalSource(Source):
__slots__ = ()
def __init__(self, *, config: Config) -> None:
super().__init__(config=config)
def server_list(self) -> list[str]:
return [socket.gethostname()]
async def get_list(self) -> list[Mail]:
res: subprocess.CompletedProcess = await trio.run_process(
['/usr/sbin/postqueue', '-j'],
capture_stdout=True,
)
queue = Mail.read_postqueue_json(res.stdout.decode('utf-8'))
queue = sorted(queue, key=lambda item: item.arrival_time) # sort by incoming date, newest last
return queue
async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
"""stdout and stderr of postcat; stdout is None if we got non-zero status"""
if not re.fullmatch(r'[A-Z0-9]+', queue_id):
return (None, f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/".encode())
flags_s = ' '.join(flags)
res: subprocess.CompletedProcess = await trio.run_process(
['/usr/sbin/postcat'] + list(flags_s) + ['-q', queue_id],
capture_stdout=True,
capture_stderr=True,
check=False,
)
if res.returncode != 0:
# expect some error was printed to stderr
return (None, res.stderr)
return (res.stdout, res.stderr)
async def flush_all(self) -> None:
await trio.run_process(['/usr/sbin/postqueue', '-f'])
async def flush(self, queue_ids: typing.Iterable[str]) -> None:
# postqueue only deals with single messages, because the flush daemon
# only accepts a single (flush) request per connection...
for msg in queue_ids:
await trio.run_process(['/usr/sbin/postqueue', '-i', msg])
async def _postsuper(
self,
action: str,
queue_ids: typing.Iterable[str],
*,
from_queues: set[QueueName] | tuple[()] = (),
):
cmd = ['/usr/sbin/postsuper', action, '-']
if isinstance(from_queues, set):
from_queues = from_queues - set((QueueName.CORRUPT,)) # remove "invalid" queue names
if from_queues:
cmd += [qn.value for qn in from_queues]
else:
# empty set of queues allowed -> no queue allowed
return
data = ''.join(f'{msg}\n' for msg in queue_ids)
await trio.run_process(cmd, stdin=data.encode())
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for mail in mails:
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mail_messages:
print("No mails found to forward.")
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
await trio.run_process(cmd, stdin=message)
class RemoteSource(Source):
__slots__ = ('remotes',)
def __init__(self, *, config: Config, remotes: dict[str, str]) -> None:
super().__init__(config=config)
self.remotes = remotes
def server_list(self) -> list[str]:
return list(self.remotes)
# split "remote mail id" into alias for remote and id on remote system
@staticmethod
def decode_queue_id(queue_id: str, *, allow_all: bool = False) -> tuple[str, str]:
parts = queue_id.split('/', maxsplit=1)
if len(parts) < 2:
raise ValueError(f"Invalid mail id: {queue_id!r} - expected HOST/localid")
host, queue_id = parts
if not re.fullmatch(r'[A-Z0-9]+', queue_id):
raise ValueError(f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/")
if not allow_all and queue_id == 'ALL':
raise ValueError(f"Invalid queue id: {queue_id!r} - ALL not allowed")
return (host, queue_id)
async def get_list(self) -> list[Mail]:
queue: list[Mail] = []
async def get_host(remote_id: str, remote_host: str) -> None:
remote_id_prefix = remote_id + '/'
res: subprocess.CompletedProcess = await trio.run_process(
['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -j'],
capture_stdout=True,
)
queue.extend(Mail.read_postqueue_json(res.stdout.decode('utf-8'), id_prefix=remote_id_prefix))
async with trio.open_nursery() as nursery:
for remote_id, remote_host in self.remotes.items():
nursery.start_soon(get_host, remote_id, remote_host)
queue = sorted(queue, key=lambda item: item.arrival_time) # sort by incoming date, newest last
return queue
async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
try:
host, queue_id = self.decode_queue_id(queue_id)
except ValueError as e:
return (None, str(e).encode())
if not host in self.remotes:
return (None, f"Unknown remote host: {host!r}".encode())
remote_host = self.remotes[host]
cmd = ['/usr/sbin/postcat'] + list(flags) + ['-q', queue_id]
res: subprocess.CompletedProcess = await trio.run_process(
['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)],
capture_stdout=True,
capture_stderr=True,
check=False,
)
if res.returncode != 0:
# expect some error was printed to stderr
return (None, res.stderr)
return (res.stdout, res.stderr)
async def flush_all(self) -> None:
async def flush_host(remote_host: str) -> None:
await trio.run_process(
['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -f'],
)
async with trio.open_nursery() as nursery:
for _, remote_host in self.remotes.items():
nursery.start_soon(flush_host, remote_host)
# group "long ids" (with alias/ prefix) by alias (remote host)
def aggregate_by_host(self, queue_ids: typing.Iterable[str], *, allow_all: bool) -> dict[str, list[str]]:
by_host: dict[str, list[str]] = {}
for long_queue_id in queue_ids:
if long_queue_id == 'ALL':
if not allow_all:
raise ValueError(f"Invalid queue id: {long_queue_id!r} - ALL not allowed")
for host in self.remotes:
by_host.setdefault(host, []).append('ALL')
continue
host, queue_id = self.decode_queue_id(long_queue_id, allow_all=allow_all)
if not host in by_host:
by_host[host] = [queue_id]
else:
by_host[host].append(queue_id)
for host in list(by_host):
if not host in self.remotes:
ids = by_host.pop(host)
raise ValueError(f"Unknown remote host {host!r}, used for {len(ids)} queue ids")
return by_host
async def flush(self, queue_ids: typing.Iterable[str]) -> None:
by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=False)
async def run_host(remote_host: str, ids: list[str]) -> None:
for run in self.chunks(ids):
# postqueue only deals with single messages, because the flush daemon
# only accepts a single (flush) request per connection...
cmd = 'for msg in ' + shlex.join(run) + '; do /usr/sbin/postqueue -i "${msg}"; done'
await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, cmd])
async with trio.open_nursery() as nursery:
for host, ids in by_host.items():
remote_host = self.remotes[host]
nursery.start_soon(run_host, remote_host, ids)
async def _postsuper(
self,
action: str,
queue_ids: typing.Iterable[str],
*,
from_queues: set[QueueName] | tuple[()] = (),
) -> None:
cmd = ['/usr/sbin/postsuper', action, '-']
if isinstance(from_queues, set):
from_queues = from_queues - set((QueueName.CORRUPT,)) # remove "invalid" queue names
if from_queues:
cmd += [qn.value for qn in from_queues]
else:
# empty set of queues allowed -> no queue allowed
return
by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=True)
async def run_host(remote_host: str, data: str) -> None:
await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)], stdin=data.encode())
async with trio.open_nursery() as nursery:
for host, ids in by_host.items():
remote_host = self.remotes[host]
data = ''.join(f'{msg}\n' for msg in ids)
nursery.start_soon(run_host, remote_host, data)
async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
mails_by_id = {
mail.queue_id: mail
for mail in mails
}
if not mails_by_id:
print("No mails found to forward.")
async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None:
mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
for queue_id in queue_ids:
mail = mails_by_id[f'{host}/{queue_id}']
(mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
if mail_message is None:
print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
mail_messages[mail.queue_id] = (mail, mail_message)
if not mails:
return
message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients)
cmd = ['sendmail', '-t', '-bm', '-f', '']
cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)]
await trio.run_process(cmd, stdin=message)
by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True)
async with trio.open_nursery() as nursery:
for host, ids in by_host.items():
remote_host = self.remotes[host]
nursery.start_soon(run_host, host, remote_host, ids)
class UserActivityStats:
__slots__ = ('count_mails', 'related_mails', 'cut_off')
# find most active address for `health` command
def __init__(self, cut_off: int = 10) -> None:
# count recipients individually for each sender, and recipients just once
self.count_mails: dict[str, int] = {}
self.related_mails: dict[str, list[Mail]] = {}
self.cut_off = cut_off
def count(self, item: Mail) -> None:
if QueueName.HOLD == item.queue_name:
return
sender = item.sender.lower()
if item.sender != 'MAILER-DAEMON':
if not sender in self.count_mails:
self.count_mails[sender] = 0
self.related_mails[sender] = []
self.count_mails[sender] += len(item.recipients)
self.related_mails[sender].append(item)
recipients = {
recp.address.lower()
for recp in item.recipients
}
for recp in recipients:
if sender == recp: continue # count user just once per mail
if not recp in self.count_mails:
self.count_mails[recp] = 0
self.related_mails[recp] = []
self.count_mails[recp] += 1
self.related_mails[recp].append(item)
def prepare(self) -> None:
for (user, count) in list(self.count_mails.items()):
if count < self.cut_off:
self.count_mails.pop(user)
def extract_highest_user(self) -> None | tuple[str, int, list[Mail]]:
max_count = 0
if not self.count_mails:
return None
user = ''
for (loop_user, count) in list(self.count_mails.items()):
if count > max_count:
user = loop_user
max_count = count
activity = self.count_mails.pop(user)
mails = self.related_mails.pop(user)
for item in mails:
sender = item.sender.lower()
if user != sender and sender in self.count_mails:
self.count_mails[sender] -= len(item.recipients)
if self.count_mails[sender] < self.cut_off:
self.count_mails.pop(sender)
recipients = {
recp.address.lower()
for recp in item.recipients
}
for recp in recipients:
if sender == recp: continue # count user just once per mail
if user != recp and recp in self.count_mails:
self.count_mails[recp] -= 1
if self.count_mails[recp] < self.cut_off:
self.count_mails.pop(recp)
return (user, activity, mails)
class Filter(abc.ABC):
"""abstract base class for filter expressions"""
__slots__ = ()
def __init__(self) -> None:
pass
@abc.abstractmethod
def matches(self, mail: Mail) -> bool:
"""Whether mail matches the filter"""
raise NotImplementedError()
@abc.abstractmethod
def possible_queues(self) -> set[QueueName]:
"""Which queues matched mails are possible from (all unless explicitly restricted)"""
raise NotImplementedError()
@abc.abstractmethod
def __repr__(self) -> str:
raise NotImplementedError()
@typing.final
def filter(self, mails: typing.Iterable[Mail]) -> typing.Generator[Mail, None, None]:
"""Filter list of mails"""
for mail in mails:
if self.matches(mail):
yield mail
@typing.final
def filter_ids(self, mails: typing.Iterable[Mail]) -> typing.Generator[str, None, None]:
"""Filter list of mails but only return mail ids"""
for mail in mails:
if self.matches(mail):
yield mail.queue_id
def __and__(self, other: typing.Any) -> Filter:
"""Combine two filter into one that only matches mails that matched both"""
if not isinstance(other, Filter):
return NotImplemented
if other is TRUE_FILTER:
return self
if other is FALSE_FILTER:
return other
if isinstance(other, AndFilter):
return AndFilter([self] + other.expressions)
return AndFilter([self, other])
def __or__(self, other: typing.Any) -> Filter:
"""Combine two filter into one that matches mails that matched at least one"""
if not isinstance(other, Filter):
return NotImplemented
if other is TRUE_FILTER:
return other
if other is FALSE_FILTER:
return self
if isinstance(other, OrFilter):
return OrFilter([self] + other.expressions)
return OrFilter([self, other])
def __invert__(self) -> Filter:
"""Create filter that exactly matches mail that didn't match the original filter"""
return NotFilter(self)
# abstract base class for filters that don't need (...) around in representations of members in combined filters
class SingleExprFilter(Filter):
__slots__ = ()
class FalseFilter(SingleExprFilter):
"""constant false - matches no mail"""
__slots__ = ()
def __repr__(self) -> str:
return '0'
def matches(self, mail: Mail) -> bool:
return False
def possible_queues(self) -> set[QueueName]:
return set()
def __and__(self, other: typing.Any) -> Filter:
if not isinstance(other, Filter):
return NotImplemented
return self
def __or__(self, other: typing.Any) -> Filter:
if not isinstance(other, Filter):
return NotImplemented
return other
def __invert__(self) -> Filter:
return TRUE_FILTER
class TrueFilter(SingleExprFilter):
"""constant true - matches all mail"""
__slots__ = ()
def __repr__(self) -> str:
return '1'
def matches(self, mail: Mail) -> bool:
return True
def possible_queues(self) -> set[QueueName]:
return ALL_QUEUE_NAMES
def __and__(self, other: typing.Any) -> Filter:
if not isinstance(other, Filter):
return NotImplemented
return other
def __or__(self, other: typing.Any) -> Filter:
if not isinstance(other, Filter):
return NotImplemented
return self
def __invert__(self) -> Filter:
return FALSE_FILTER
FALSE_FILTER = FalseFilter()
TRUE_FILTER = TrueFilter()
class AndFilter(Filter):
__slots__ = ('expressions',)
def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None:
super().__init__()
self.expressions: list[Filter] = expressions or []
def matches(self, mail: Mail) -> bool:
for expr in self.expressions:
if not expr.matches(mail):
return False
return True
def possible_queues(self) -> set[QueueName]:
res = set(ALL_QUEUE_NAMES) # clone set
for expr in self.expressions:
res &= expr.possible_queues()
return res
def __repr__(self) -> str:
if len(self.expressions) == 0:
return '1'
if len(self.expressions) == 1:
return repr(self.expressions[0])
subexprs = []
for e in self.expressions:
if isinstance(e, (SingleExprFilter, AndFilter)):
subexprs.append(repr(e))
else:
subexprs.append(f'({e!r})')
return ' and '.join(subexprs)
def __and__(self, other: typing.Any) -> AndFilter:
if not isinstance(other, Filter):
return NotImplemented
if isinstance(other, AndFilter):
return AndFilter(self.expressions + other.expressions)
return AndFilter(self.expressions + [other])
@staticmethod
@parser_action
def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
assert len(toks) == 1
inner_toks = list(toks[0])
assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}"
token = inner_toks.pop(0)
assert isinstance(token, Filter)
while inner_toks:
assert inner_toks.pop(0) == 'and', f"Missing 'and' in {toks}"
next_token = inner_toks.pop(0)
assert isinstance(next_token, Filter), f"Non-filter in {toks}"
token = token & next_token
return token
class OrFilter(Filter):
__slots__ = ('expressions',)
def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None:
super().__init__()
self.expressions: list[Filter] = expressions or []
def matches(self, mail: Mail) -> bool:
for expr in self.expressions:
if expr.matches(mail):
return True
return False
def possible_queues(self) -> set[QueueName]:
res = set()
for expr in self.expressions:
res |= expr.possible_queues()
return res
def __repr__(self) -> str:
if len(self.expressions) == 0:
return '0'
if len(self.expressions) == 1:
return repr(self.expressions[0])
subexprs = []
for e in self.expressions:
if isinstance(e, (SingleExprFilter, OrFilter)):
subexprs.append(repr(e))
else:
subexprs.append(f'({e!r})')
return ' or '.join(subexprs)
def __or__(self, other: typing.Any) -> OrFilter:
if not isinstance(other, Filter):
return NotImplemented
if isinstance(other, OrFilter):
return OrFilter(self.expressions + other.expressions)
return OrFilter(self.expressions + [other])
@staticmethod
@parser_action
def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
assert len(toks) == 1
inner_toks = list(toks[0])
assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}"
token = inner_toks.pop(0)
assert isinstance(token, Filter)
while inner_toks:
assert inner_toks.pop(0) == 'or', f"Missing 'or' in {toks}"
next_token = inner_toks.pop(0)
assert isinstance(next_token, Filter), f"Non-filter in {toks}"
token = token | next_token
return token
class NotFilter(SingleExprFilter):
__slots__ = ('expression',)
def __init__(self, expression: Filter) -> None:
super().__init__()
self.expression = expression
def matches(self, mail: Mail) -> bool:
return not self.expression.matches(mail)
def possible_queues(self) -> set[QueueName]:
# negation could match mails from any filter!
if isinstance(self.expression, QueueFilter):
# in this case we actually no the negation
return ALL_QUEUE_NAMES - self.expression.select
# otherwise: everything is possible
return ALL_QUEUE_NAMES
def __repr__(self) -> str:
if isinstance(self.expression, SingleExprFilter):
return f'not {self.expression!r}'
else:
return f'not ({self.expression!r})'
def __invert__(self) -> Filter:
return self.expression
@staticmethod
@parser_action
def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
assert len(toks) == 1
toks = toks[0]
assert len(toks) == 2
assert toks[0] == 'not'
assert isinstance(toks[1], Filter)
return ~toks[1]
class QueueFilter(SingleExprFilter):
"""match mails based on queue they are in"""
__slots__ = ('select',)
def __init__(self, select: list[QueueName]) -> None:
self.select = set(select)
def matches(self, mail: Mail) -> bool:
return mail.queue_name in self.select
def possible_queues(self) -> set[QueueName]:
return self.select
def __repr__(self) -> str:
if not self.select:
return '0'
return f'queue {",".join(qn.value for qn in self.select)}'
def __and__(self, other: typing.Any) -> Filter:
if not isinstance(other, Filter):
return NotImplemented
if isinstance(other, QueueFilter):
select = self.select & other.select
if select:
return QueueFilter(list(select))
else:
return FALSE_FILTER
return super().__and__(other)
def __or__(self, other: typing.Any) -> Filter:
if not isinstance(other, Filter):
return NotImplemented
if isinstance(other, QueueFilter):
select = self.select | other.select
return QueueFilter(list(select))
return super().__or__(other)
# def __invert__(self) -> Filter:
# select = ALL_QUEUE_NAMES - self.select
# return QueueFilter(list(select))
@staticmethod
@parser_action
def from_parse_results(toks: pyparsing.ParseResults) -> QueueFilter:
return QueueFilter([QueueName(t) for t in toks])
class AddressSelector(enum.Enum):
# addresses of a mail to extract
SENDER = "from"
RECIPIENT = "to"
ANY = "address"
def extract(self, mail: Mail) -> typing.Generator[str, None, None]:
if self == AddressSelector.SENDER or self == AddressSelector.ANY:
yield mail.sender
if self == AddressSelector.RECIPIENT or self == AddressSelector.ANY:
for recp in mail.recipients:
yield recp.address
class BaseAddressPattern(abc.ABC):
__slots__ = ()
# abstract base class of patterns to use to match a mail address
# subclasses match either: full address, domain part, regex
def __init__(self) -> None:
pass
@abc.abstractmethod
def __repr__(self) -> str:
raise NotImplementedError()
@abc.abstractmethod
def matches(self, address: str) -> bool:
raise NotImplementedError()
@staticmethod
@parser_action
def from_unquoted_parse_results(toks: pyparsing.ParseResults) -> list[BaseAddressPattern]:
addresses: str = toks[0]
patterns: list[BaseAddressPattern] = []
for address in addresses.strip().split(','):
address = address.strip()
if not address:
continue
if address.startswith('~'):
patterns.append(AddressRegexMatch(address[1:]))
elif address.startswith('@'):
patterns.append(AddressDomainPattern(address[1:]))
else:
patterns.append(AddressPattern(address))
return patterns
class AddressPattern(BaseAddressPattern):
__slots__ = ('address',)
# match full address exactly
def __init__(self, address: str) -> None:
super().__init__()
self.address = address.lower().removesuffix('.')
def matches(self, address: str) -> bool:
return address.lower().removesuffix('.') == self.address
def __repr__(self) -> str:
return f'{self.address!r}'
@staticmethod
@parser_action
def from_parse_results(toks: pyparsing.ParseResults) -> BaseAddressPattern:
address: str = toks[0]
if address.startswith('@'):
return AddressDomainPattern(address[1:])
else:
return AddressPattern(address)
class AddressDomainPattern(BaseAddressPattern):
__slots__ = ('domain',)
# match address by domain
def __init__(self, domain: str) -> None:
super().__init__()
self.domain = domain.lower().removesuffix('.')
def matches(self, address: str) -> bool:
parts = address.rsplit('@', maxsplit=1)
if len(parts) == 1:
# "MAILER-DAEMON"
return False
return parts[1].lower().removesuffix('.') == self.domain
def __repr__(self) -> str:
return f'{"@" + self.domain!r}'
class AddressRegexMatch(BaseAddressPattern):
__slots__ = ('address',)
# match address by regex
def __init__(self, address: str | re.Pattern[str]) -> None:
super().__init__()
if isinstance(address, re.Pattern):
self.address = address
else:
self.address = re.compile(address)
def matches(self, address: str) -> bool:
return bool(self.address.search(address.lower().removesuffix('.')))
def __repr__(self) -> str:
return f'~{self.address.pattern!r}'
@staticmethod
@parser_action
def from_parse_results(toks: pyparsing.ParseResults) -> AddressRegexMatch:
return AddressRegexMatch(toks[0])
class AddressFilter(SingleExprFilter):
__slots__ = ('selector', 'patterns')
# match mails by address
def __init__(self, selector: AddressSelector, patterns: list[BaseAddressPattern]) -> None:
self.selector = selector
self.patterns = patterns
def matches(self, mail: Mail) -> bool:
for address in self.selector.extract(mail):
for pattern in self.patterns:
if pattern.matches(address):
return True
return False
def possible_queues(self) -> set[QueueName]:
return ALL_QUEUE_NAMES
def __repr__(self) -> str:
if not self.patterns:
return '0'
return f'{self.selector.value} {" ".join(repr(p) for p in self.patterns)}'
@staticmethod
@parser_action
def from_parse_results(toks: pyparsing.ParseResults) -> AddressFilter:
return AddressFilter(
selector=AddressSelector(toks[0]),
patterns=list(toks[1]),
)
# create parser for filter expressions
def build_filter_parser() -> typing.Callable[[str], Filter]:
addr_expr_selector = pyparsing.MatchFirst((
pyparsing.Keyword(sel.value)
for sel in AddressSelector
))
squoted = pyparsing.QuotedString(quoteChar="'", escChar='\\', unquoteResults=True)
dquoted = pyparsing.QuotedString(quoteChar='"', escChar='\\', unquoteResults=True)
quoted = squoted | dquoted
addr_expr_pattern_q_re = (pyparsing.Suppress('~') + quoted).setParseAction(AddressRegexMatch.from_parse_results)
addr_expr_pattern_q = quoted.copy().setParseAction(AddressPattern.from_parse_results)
addr_expr_pattern = pyparsing.Word(pyparsing.printables, excludeChars="'\"") \
.setParseAction(BaseAddressPattern.from_unquoted_parse_results)
addr_expr_patterns = pyparsing.MatchFirst([
pyparsing.OneOrMore(addr_expr_pattern_q_re | addr_expr_pattern_q),
addr_expr_pattern,
])
addr_expr = (
addr_expr_selector + pyparsing.Optional(pyparsing.Suppress('=')) + pyparsing.Group(addr_expr_patterns)
).setParseAction(AddressFilter.from_parse_results)
queue_expr_name = pyparsing.MatchFirst([pyparsing.Keyword(qn.value) for qn in QueueName])
queue_expr_single = queue_expr_name
queue_expr_mult = (
pyparsing.Suppress(pyparsing.Keyword('queue'))
+ queue_expr_name
+ pyparsing.ZeroOrMore(pyparsing.Suppress(',') + queue_expr_name)
)
queue_expr = (queue_expr_single | queue_expr_mult).setParseAction(QueueFilter.from_parse_results)
const_true = pyparsing.Keyword('1').setParseAction(lambda x: TRUE_FILTER)
const_false = pyparsing.Keyword('0').setParseAction(lambda x: FALSE_FILTER)
base_expr = addr_expr | queue_expr | const_true | const_false
parser = pyparsing.infixNotation(base_expr, [
('not', 1, pyparsing.opAssoc.RIGHT, NotFilter.from_parse_results),
('or', 2, pyparsing.opAssoc.LEFT, OrFilter.from_parse_results),
('and', 2, pyparsing.opAssoc.LEFT, AndFilter.from_parse_results),
])
def parse(input: str) -> Filter:
return parser.parseString(input, parseAll=True)[0]
return parse
FILTER_PARSER = build_filter_parser()
# TODO: proper type hinting for decorator builder?
# commands are methods in the CLI class and need to be decorated with this.
# the name defaults to the name of the function.
def register_command(*, name: str | tuple[()] = ()):
def register(cmd: typing.Callable) -> typing.Callable:
setattr(cmd, '__command', name)
return cmd
return register
class CLI:
def __init__(self, source: Source) -> None:
self.source = source
self.current_filter: AndFilter = AndFilter()
self._simple_commands: dict[str, typing.Callable[[], typing.Awaitable[None]]] = {}
self._long_commands: dict[str, typing.Callable[[str], typing.Awaitable[None]]] = {}
for name in dir(self):
prop = getattr(self, name)
if callable(prop):
command = getattr(prop, '__command', None)
if command is None: continue
if command == ():
cmd_name = name
else:
assert isinstance(command, str)
cmd_name = command
sig = inspect.signature(prop)
simple = False
long = False
if len(sig.parameters) == 0:
simple = True
elif len(sig.parameters) == 1:
long = True
param = list(sig.parameters.values())[0]
if not param.default is inspect.Parameter.empty:
simple = True
else:
raise TypeError('Commands must take zero or one arguments')
if simple:
self._simple_commands[cmd_name] = prop
if long:
self._long_commands[cmd_name] = prop
self._known_commands = set(self._simple_commands) | set(self._long_commands)
self._init_readline()
self._completion_cache: list[str] = []
def _readline_completer(self, text: str, state: int) -> str | None:
text_lstripped = text.lstrip()
front_space = text[:len(text) - len(text_lstripped)]
text = text_lstripped
if len(text.split(maxsplit=1)) > 1:
# not in first word (command) anymore - no more completions
return None
if text.rstrip() != text:
# whitespace after first word (command) - no more completions
return None
if state == 0:
self._completion_cache = [
front_space + cmd
for cmd in self._known_commands
if cmd.startswith(text)
]
if state >= len(self._completion_cache):
return None
return self._completion_cache[state]
def _init_readline(self) -> None:
readline.read_init_file()
# completer
readline.parse_and_bind("tab: complete")
readline.set_completer(self._readline_completer)
readline.set_completer_delims("") # want to see full buffer in completer
# history handling:
self._history_file = os.path.join(os.path.expanduser("~"), ".pqm_history")
try:
readline.read_history_file(self._history_file)
except FileNotFoundError:
pass
previous_history_len = readline.get_current_history_length()
readline.set_auto_history(True)
readline.set_history_length(100)
def finish_readline() -> None:
new_entries = readline.get_current_history_length() - previous_history_len
if new_entries > 0:
try:
readline.append_history_file(new_entries, self._history_file)
except FileNotFoundError:
readline.write_history_file(self._history_file)
atexit.register(finish_readline)
@register_command()
async def help(self) -> None:
"""Show all available commands"""
all_cmds: list[tuple[str, str]] = []
for cmd, func1 in self._simple_commands.items():
if self._long_commands.get(cmd, None) is func1:
all_cmds.append((f'{cmd} [<arg>]', func1.__doc__ or 'No help available'))
else:
all_cmds.append((cmd, func1.__doc__ or 'No help available'))
for cmd, func2 in self._long_commands.items():
if self._simple_commands.get(cmd, None) is func2:
continue
all_cmds.append((f'{cmd} <arg>', func2.__doc__ or 'No help available'))
for cmd, doc in sorted(all_cmds):
doc = doc.strip().splitlines()[0]
print(f'{cmd:<30} {doc}', file=sys.stderr)
print(file=sys.stderr)
@register_command(name="help")
async def help_command(self, args: str) -> None:
"""Show detailed help for a command"""
command = args.strip()
if not command in self._known_commands:
print(f'Command {command} not known, no help available', file=sys.stderr)
return
simple_command = self._simple_commands.get(command, None)
long_command = self._long_commands.get(command, None)
if simple_command is long_command:
print(f'>>>> {command} [args]', file=sys.stderr)
print(file=sys.stderr)
doc = inspect.cleandoc(simple_command.__doc__ or 'No help available')
print(doc, file=sys.stderr)
print(file=sys.stderr)
return
if simple_command:
print(f'>>>> {command}', file=sys.stderr)
print(file=sys.stderr)
doc = inspect.cleandoc(simple_command.__doc__ or 'No help available')
print(doc, file=sys.stderr)
print(file=sys.stderr)
if long_command:
print(f'>>>> {command} <args>', file=sys.stderr)
print(file=sys.stderr)
doc = inspect.cleandoc(long_command.__doc__ or 'No help available')
print(doc, file=sys.stderr)
print(file=sys.stderr)
@register_command()
async def exit(self) -> None:
"""Exit pqm (or press Ctrl-D on empty prompt)"""
# leave main loop
raise KeyboardInterrupt()
def prompt_if_empty_filter(self) -> bool:
if self.current_filter.expressions:
return True
print(
"You currently have no filters configured, which means this action will "
"potentially run for a lot of mails",
file=sys.stderr,
)
ack = input_ack("Continue anyway (y/N)? ")
if ack.lower()[:1] != "y":
return False
return True
@register_command()
async def health(self) -> None:
"""
Show generic stats for queues and overly active address
If the current filter isn't empty show queue counts for both matching and all mails.
Only shows overly active addresses for mails matching the current filter (or all, if not set); never includes mails on hold.
"""
count_q_other = 0
count_q = {
QueueName.ACTIVE: 0,
QueueName.DEFERRED: 0,
QueueName.HOLD: 0,
}
total_q_other = 0
total_q = {
QueueName.ACTIVE: 0,
QueueName.DEFERRED: 0,
QueueName.HOLD: 0,
}
stats = UserActivityStats()
for item in await self.source.get_list():
if item.queue_name in total_q:
total_q[item.queue_name] += 1
else:
total_q_other += 1
if self.current_filter.matches(item):
if item.queue_name in count_q:
count_q[item.queue_name] += 1
else:
count_q_other += 1
stats.count(item)
stats.prepare()
servers = self.source.server_list()
print(f"Servers: {len(servers)} ({', '.join(servers)})")
if self.current_filter.expressions:
print(f"Active: {count_q[QueueName.ACTIVE]} (total: {total_q[QueueName.ACTIVE]})")
print(f"Deferred: {count_q[QueueName.DEFERRED]} (total: {total_q[QueueName.DEFERRED]})")
print(f"Hold: {count_q[QueueName.HOLD]} (total: {total_q[QueueName.HOLD]})")
if count_q_other:
print(f"Other: {count_q_other} (total: {total_q_other}")
else:
print(f"Active: {count_q[QueueName.ACTIVE]}")
print(f"Deferred: {count_q[QueueName.DEFERRED]}")
print(f"Hold: {count_q[QueueName.HOLD]}")
if count_q_other:
print(f"Other: {count_q_other}")
print()
print("Looking for users with more many open communications matching the current search criteria")
while True:
x = stats.extract_highest_user()
if not x: break
user, activity, mails = x
print("User {} has {} open communications".format(user, activity))
QUEUE_FLAGS = {
QueueName.HOLD: '!',
QueueName.ACTIVE: '*',
QueueName.CORRUPT: '?',
QueueName.INCOMING: '$',
QueueName.MAILDROP: '~',
QueueName.DEFERRED: ' ',
}
async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None:
flt: AndFilter
if args:
# additional restrictions
try:
parsed_flt = FILTER_PARSER(args)
except pyparsing.ParseException as e:
print(pyparsing.ParseException.explain(e, 0))
# print(e.explain(depth=0), file=sys.stderr)
return
flt = self.current_filter & parsed_flt
else:
flt = self.current_filter
show_filter: Filter
if not all and not flt.expressions:
# by default hide active and hold, unless -all variant is used or specific filters are used
show_filter = NotFilter(QueueFilter([QueueName.ACTIVE, QueueName.HOLD]))
else:
show_filter = flt
for item in show_filter.filter(await self.source.get_list()):
item.print(verbose=verbose)
async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]:
mails = {
mail.queue_id: mail
for mail in await self.source.get_list()
}
missing = []
verified_list = []
for queue_id in given_ids:
item = mails.get(queue_id, None)
if item:
item.print(verbose=False)
verified_list.append(item)
else:
missing.append(queue_id)
if missing:
print(f"Couldn't find mails with the following IDs: {' '.join(missing)}", file=sys.stderr)
return verified_list
@register_command()
async def list(self, args: str = '') -> None:
"""
List all mails (single line per mail); if no filter is configured hide active and hold queue
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
"""
await self.list_impl(args=args, verbose=False, all=False)
@register_command(name="list-verbose")
async def list_verbose(self, args: str = '') -> None:
"""
List all mails (verbose output); if no filter is configured hide active and hold queue
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
"""
await self.list_impl(args=args, verbose=True, all=False)
@register_command(name="list-all")
async def list_all(self, args: str = '') -> None:
"""
List all mails (including active + hold queue, single line per mail)
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
"""
await self.list_impl(args=args, verbose=False, all=True)
@register_command(name="list-verbose-all")
async def list_verbose_all(self, args: str = '') -> None:
"""
List all mails (including active + hold queue, verbose output)
Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
"""
await self.list_impl(args=args, verbose=True, all=True)
@register_command(name="select")
async def select(self, args: str) -> None:
"""
Add filter for mails that other commands work on
Filter syntax:
- `from bob@example.com`, `from alice@example.com,bob@example.com,@example.net`, `from "alice@example.com" "bob@example.com"`
Multiple address can be given to match any of them; either unquoted and comma separated, or quoted and whitespace
separated. In the unquoted case the (comma separated) pattern must not include any whitespace.
An address starting with `@` only checks the domain name and ignores the local part.
- `from ~regex.*@example.(com|net)`, `from ~"regex.*@example.(com|net)" "other@example.com`
To use regular expressions either put `~` in front of an unquoted pattern (need to repeat for each regex in a
comma separated pattern list) or before the quotes of a quoted pattern.
- `to ...` and `address ...` (matching both to and from) work the same as `from ...`
- `queue hold,deferred`
Comma separated list of queues a mail must be in. For single queue names `queue` can be omitted, e.g. `hold`.
- `$expr1 and $expr2`
- `$expr1 or $expr2` (`a and b or c` is parsed as `a and (b or c)`)
- `($expr)`
- `not $expr`
"""
try:
flt = FILTER_PARSER(args)
except pyparsing.ParseException as e:
print(pyparsing.ParseException.explain(e, 0))
# print(e.explain(depth=0), file=sys.stderr)
return
self.current_filter.expressions.append(flt)
@register_command()
async def clear(self) -> None:
"""Clear all filters"""
self.current_filter.expressions = []
@register_command(name="clear")
async def clear_ndx(self, args: str) -> None:
"""Clear filter with given index (zero based; first filter is at index 0)"""
ndx = int(args)
self.current_filter.expressions.pop(ndx)
@register_command(name="pop")
async def pop(self) -> None:
"""Remove last filter"""
if self.current_filter.expressions:
self.current_filter.expressions.pop()
else:
print("No filters configured, nothing to pop", file=sys.stderr)
@register_command()
async def current(self) -> None:
"""Show current filters"""
for e in self.current_filter.expressions:
print(f'select {e}', file=sys.stderr)
_SHOW_ARGS_PARSER = argparse.ArgumentParser('show', description="Show mails", add_help=False)
_SHOW_ARGS_PARSER.add_argument('-b', dest='body', action='store_true', help="Show body content")
_SHOW_ARGS_PARSER.add_argument('-e', dest='envelope', action='store_true', help="Show message envelope content.")
_SHOW_ARGS_PARSER.add_argument('-h', dest='header', action='store_true', help="Show message header content.")
_SHOW_ARGS_PARSER.add_argument('ID', nargs='+')
@register_command()
async def show(self, args: str) -> None:
ns_args = self._SHOW_ARGS_PARSER.parse_args(shlex.split(args))
if ns_args.body or ns_args.envelope or ns_args.header:
flags = []
if ns_args.body: flags += ['-b']
if ns_args.envelope: flags += ['-e']
if ns_args.header: flags += ['-h']
else:
# default to header only for now
flags = ['-h']
head = 16 * "-"
async def print_mail(result: tuple[str, bytes, bytes]) -> None:
msg, mail, stderr = result
print(f'{head} {msg:^16} {head}')
sys.stdout.flush()
if stderr:
sys.stderr.buffer.write(stderr)
sys.stderr.flush()
sys.stdout.buffer.write(mail)
sys.stdout.flush()
async def get_mail(msg: str) -> tuple[str, bytes, bytes]:
mail, err = await self.source.get_mail(msg, flags=flags)
if err and not err.endswith(b'\n'):
err += b'\n'
mail = mail or b''
if mail and not mail.endswith(b'\n'):
mail += b'\n'
return (msg, mail, err)
async with trio_parallel_ordered(print_mail) as tpo:
for msg in ns_args.ID:
await tpo.start(get_mail, msg)
print(head + 18*"-" + head)
show.__doc__ = _SHOW_ARGS_PARSER.format_help()
@register_command(name="flush-all")
async def flush_all(self) -> None:
"""
Flush the queue: attempt to deliver all queued mail.
Ignores the current filters (but prompts if you have filters).
"""
if self.current_filter.expressions:
print(
"You have currently configured filters, but this command will flush all mails, whether they match the filter or not.\n"
"You might want to use the `flush` command instead (or clear the filters before)",
file=sys.stderr,
)
ack = input_ack("Continue anyway (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.flush_all()
@register_command()
async def flush(self) -> None:
"""Flush (deferred) selected mails in the queue: attempt to deliver them"""
if not self.prompt_if_empty_filter():
return
flt = QueueFilter([QueueName.DEFERRED]) & self.current_filter
await self.source.flush(flt.filter_ids(await self.source.get_list()))
@register_command(name="flush")
async def flush_args(self, args: str) -> None:
"""Flush (deferred) mails with given IDs: attempt to deliver them"""
await self.source.flush(args.strip().split())
async def delete_impl(self, flt: Filter) -> None:
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really delete those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
queues = flt.possible_queues() # restrict to selected queues to prevent accidents
await self.source.delete((mail.queue_id for mail in mails), from_queues=queues)
@register_command()
async def delete(self) -> None:
"""Delete selected mails from the hold queue"""
if not self.current_filter.expressions:
print("Command not allowed without filter")
return
# restrict to hold queue
flt = QueueFilter([QueueName.HOLD]) & self.current_filter
await self.delete_impl(flt)
@register_command(name="delete-all")
async def delete_all(self) -> None:
"""Delete selected mails from all queues"""
if not self.current_filter.expressions:
print("Command not allowed without filter")
return
await self.delete_impl(self.current_filter)
@register_command(name="delete")
async def delete_args(self, args: str) -> None:
"""Delete mails with given IDs from the queues"""
delete_list = [mail.queue_id for mail in await self._print_mails_with_ids_for_ack(args.strip().split())]
ack = input_ack("Really delete those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.delete(delete_list)
@register_command()
async def hold(self) -> None:
"""Put selected mails on hold"""
if not self.prompt_if_empty_filter():
return
# basically: skip mails that are already on hold
restrict_from_queues = [QueueName.INCOMING, QueueName.ACTIVE, QueueName.DEFERRED]
flt = QueueFilter(restrict_from_queues) & self.current_filter
queues = flt.possible_queues()
await self.source.hold(
flt.filter_ids(await self.source.get_list()),
from_queues=queues,
)
@register_command(name="hold")
async def hold_args(self, args: str) -> None:
"""Put mails with given IDs on hold"""
await self.source.hold(
args.strip().split(),
)
@register_command()
async def release(self) -> None:
"""Release mail that was put "on hold"."""
if not self.prompt_if_empty_filter():
return
flt = QueueFilter([QueueName.HOLD]) & self.current_filter
queues = flt.possible_queues()
await self.source.release(
flt.filter_ids(await self.source.get_list()),
from_queues=queues,
)
@register_command(name="release")
async def release_args(self, args: str) -> None:
"""Release mails with given IDs that were put "on hold"."""
await self.source.release(
args.strip().split(),
)
@register_command()
async def requeue(self) -> None:
"""Requeue mail (move to "maildrop"; get reprocessed)"""
ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ")
if ack.lower()[:1] != "y":
return
if not self.prompt_if_empty_filter():
return
flt = self.current_filter
queues = flt.possible_queues()
await self.source.requeue(
flt.filter_ids(await self.source.get_list()),
from_queues=queues,
)
@register_command(name="requeue")
async def requeue_args(self, args: str) -> None:
"""Requeue mail with given IDs (move to "maildrop"; get reprocessed)"""
ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.requeue(
args.strip().split(),
)
@register_command()
async def expire(self) -> None:
"""Expire mails and flush them (return error to sender)."""
if not self.prompt_if_empty_filter():
return
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
queues = flt.possible_queues()
await self.source.expire((mail.queue_id for mail in mails), from_queues=queues)
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED))
@register_command(name="expire")
async def expire_args(self, args: str) -> None:
"""Expire mails (return error to sender) with given IDs and flush them."""
mails = await self._print_mails_with_ids_for_ack(args.strip().split())
ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.expire((mail.queue_id for mail in mails))
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED))
@register_command(name="expire-release")
async def expire_release(self) -> None:
"""Expire mails and flush them (return error to sender); release if mails are on hold."""
if not self.prompt_if_empty_filter():
return
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
queues = flt.possible_queues()
await self.source.expire_release((mail.queue_id for mail in mails), from_queues=queues)
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD)))
@register_command(name="expire-release")
async def expire_release_args(self, args: str) -> None:
"""Expire mails (return error to sender) with given IDs and flush them; release if mails are on hold."""
mails = await self._print_mails_with_ids_for_ack(args.strip().split())
ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.expire_release((mail.queue_id for mail in mails))
await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD)))
@register_command(name="expire-noflush")
async def expire_noflush(self) -> None:
"""Expire mails (return error to sender on next delivery attempt)."""
if not self.prompt_if_empty_filter():
return
flt = self.current_filter
mails = list(flt.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really expire those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
queues = flt.possible_queues()
await self.source.expire((mail.queue_id for mail in mails), from_queues=queues)
@register_command(name="expire-noflush")
async def expire_noflush_args(self, args: str) -> None:
"""Expire mails (return error to sender on next delivery attempt) with given IDs."""
mails = await self._print_mails_with_ids_for_ack(args.strip().split())
ack = input_ack("Really expire those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.expire((mail.queue_id for mail in mails))
@register_command(name="forward-to")
async def forward_to(self, args: str) -> None:
"""
Forward mails to given recipients.
Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice <alice.example.com>`).
"""
if not args:
print("No recipients given")
return
if not self.prompt_if_empty_filter():
return
mails = list(self.current_filter.filter(await self.source.get_list()))
for item in mails:
item.print(verbose=False)
ack = input_ack("Really forward those mails (y/N)? ")
if ack.lower()[:1] != "y":
return
await self.source.forward_to(mails=mails, recipients=args.split())
async def prompt(self) -> None:
# main loop of CLI
while True:
try:
full_line = input('# ')
except KeyboardInterrupt: # ctrl-c
print('^C', file=sys.stderr)
continue
except EOFError: # ctrl-d
print() # newline after prompt
return
stripped_line = full_line.strip()
if full_line.startswith(' '):
# don't add space prefixed lines to history
readline.remove_history_item(readline.get_current_history_length() - 1)
elif stripped_line != full_line:
# don't store trailing spaces
readline.replace_history_item(readline.get_current_history_length() - 1, stripped_line)
parts = stripped_line.split(maxsplit=1)
if not parts:
continue
command = parts[0]
has_arg = len(parts) > 1
try:
if has_arg and command in self._long_commands:
await self._long_commands[command](parts[1])
continue
elif not has_arg and command in self._simple_commands:
await self._simple_commands[command]()
continue
except (Exception, KeyboardInterrupt):
# TODO: skip first frame (only pointing to line above)
traceback.print_exc()
continue
# unknown command / invalid invocation
if command in self._simple_commands:
print(f"Command {command} doesn't take any arguments", file=sys.stderr)
elif command in self._long_commands:
print(f"Command {command} requires arguments", file=sys.stderr)
else:
print(f"Unknown command: {command}", file=sys.stderr)
def main() -> None:
parser = argparse.ArgumentParser(description="postfix (cluster) queue manager")
parser.add_argument('--config', '-c', type=pathlib.Path, help="Path to config file (default: try ~/.config/pqm.yaml, /etc/pqm.yaml)")
args = parser.parse_args()
config = Config.load(args.config or '')
cli = CLI(source=config.source())
try:
trio.run(cli.prompt)
except KeyboardInterrupt:
pass
main()