359 lines
11 KiB
Python
359 lines
11 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
from __future__ import annotations
|
||
|
|
||
|
import base64
|
||
|
import dataclasses
|
||
|
import fcntl
|
||
|
import hmac
|
||
|
import http
|
||
|
import http.server
|
||
|
import logging
|
||
|
import math
|
||
|
import os
|
||
|
import os.path
|
||
|
import shutil
|
||
|
import shlex
|
||
|
import signal
|
||
|
import subprocess
|
||
|
import sys
|
||
|
import traceback
|
||
|
import urllib.parse
|
||
|
|
||
|
import trio
|
||
|
import yaml
|
||
|
|
||
|
|
||
|
_log = logging.getLogger('git-build-triggers')
|
||
|
logging.basicConfig(
|
||
|
format='%(asctime)s: %(levelname)s: %(message)s',
|
||
|
level=logging.INFO,
|
||
|
)
|
||
|
|
||
|
|
||
|
class UnixFileLock:
|
||
|
__slots__ = ('_path', '_fd')
|
||
|
|
||
|
def __init__(self, path: str) -> None:
|
||
|
self._path = path
|
||
|
self._fd: int | None = None
|
||
|
|
||
|
def acquire(self) -> bool:
|
||
|
if not self._fd is None:
|
||
|
raise RuntimeError(f"UnixFileLock({self._path!r}) already locked; re-entry not allowed")
|
||
|
|
||
|
fd = os.open(self._path, os.O_RDWR | os.O_CREAT | os.O_TRUNC)
|
||
|
try:
|
||
|
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||
|
except OSError:
|
||
|
os.close(fd)
|
||
|
return False
|
||
|
else:
|
||
|
self._fd = fd
|
||
|
return True
|
||
|
|
||
|
def release(self) -> None:
|
||
|
fd, self._fd = self._fd, None
|
||
|
if not fd is None:
|
||
|
fcntl.flock(fd, fcntl.LOCK_UN)
|
||
|
os.close(fd)
|
||
|
|
||
|
|
||
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||
|
class Job:
|
||
|
repository: Repository
|
||
|
lock: UnixFileLock
|
||
|
|
||
|
|
||
|
@dataclasses.dataclass(slots=True)
|
||
|
class JobQueue:
|
||
|
parallel: int
|
||
|
_queue: trio.MemorySendChannel[Job]
|
||
|
_rx: trio.MemoryReceiveChannel[Job]
|
||
|
|
||
|
def __init__(self, *, parallel: int = 1) -> None:
|
||
|
self.parallel = parallel
|
||
|
self._queue, self._rx = trio.open_memory_channel(math.inf)
|
||
|
|
||
|
async def run(self) -> None:
|
||
|
limit = trio.CapacityLimiter(self.parallel)
|
||
|
|
||
|
async def work(job: Job) -> None:
|
||
|
try:
|
||
|
async with limit:
|
||
|
await job.repository.update()
|
||
|
try:
|
||
|
os.remove(job.repository._path_rebuild)
|
||
|
except FileNotFoundError:
|
||
|
return
|
||
|
# build again
|
||
|
await self._queue.send(job)
|
||
|
finally:
|
||
|
job.lock.release()
|
||
|
|
||
|
async with trio.open_nursery() as nursery:
|
||
|
job: Job
|
||
|
async for job in self._rx:
|
||
|
nursery.start_soon(work, job)
|
||
|
|
||
|
def queue(self, job: Job) -> None:
|
||
|
trio.from_thread.run(lambda: self._queue.send(job))
|
||
|
|
||
|
def stop(self) -> None:
|
||
|
self._queue.close()
|
||
|
|
||
|
|
||
|
JOB_QUEUE: JobQueue
|
||
|
|
||
|
|
||
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||
|
class Repository:
|
||
|
name: str
|
||
|
workdir: str
|
||
|
token: str = dataclasses.field(repr=False)
|
||
|
command: str
|
||
|
config: Config = dataclasses.field(repr=False)
|
||
|
_path_gitdir: str = dataclasses.field(init=False)
|
||
|
_path_lockfile: str = dataclasses.field(init=False)
|
||
|
_path_lastbuild: str = dataclasses.field(init=False)
|
||
|
_path_rebuild: str = dataclasses.field(init=False)
|
||
|
|
||
|
def __post_init__(self) -> None:
|
||
|
self._path_gitdir = gitdir = os.path.join(self.workdir, ".git")
|
||
|
self._path_lockfile = os.path.join(gitdir, "build.lock")
|
||
|
self._path_lastbuild = os.path.join(gitdir, "build.status")
|
||
|
self._path_rebuild = os.path.join(gitdir, "rebuild_flag")
|
||
|
|
||
|
def _writestatus(self, commit: str, message: str|bytes) -> None:
|
||
|
if isinstance(message, str):
|
||
|
message = message.encode()
|
||
|
tmpname = self._path_lastbuild + ".tmp"
|
||
|
try:
|
||
|
with open(tmpname, "wb") as lastbuild:
|
||
|
lastbuild.write(commit.encode() + b"\n" + message)
|
||
|
os.rename(tmpname, self._path_lastbuild)
|
||
|
except OSError as e:
|
||
|
_log.error(f"{self.name}: Failed to update {self._path_lastbuild}: {e}")
|
||
|
|
||
|
async def _run_git(self, cmd: list[str]) -> bytes:
|
||
|
result: subprocess.CompletedProcess[bytes] = await trio.run_process(
|
||
|
[self.config.git_path] + cmd,
|
||
|
cwd=self.workdir,
|
||
|
capture_stdout=True,
|
||
|
)
|
||
|
return result.stdout
|
||
|
|
||
|
async def _update(self, last_commit: str) -> None:
|
||
|
_log.info(f"{self.name}: Updating git")
|
||
|
# remove all ignored/untracked files and directories:
|
||
|
await self._run_git(["clean", "--force", "-d", "-x"])
|
||
|
await self._run_git(["remote", "update", "origin"])
|
||
|
branch_name = (await self._run_git(["rev-parse", "--abbrev-ref", "HEAD"])).decode().strip()
|
||
|
await self._run_git(["reset", "--hard", f"origin/{branch_name}"])
|
||
|
commit_id = (await self._run_git(["rev-parse", "HEAD"])).decode().strip()
|
||
|
if last_commit == commit_id:
|
||
|
_log.info(f"{self.name}: No changes (still {last_commit})")
|
||
|
return # no changes
|
||
|
|
||
|
build: subprocess.CompletedProcess[bytes] = await trio.run_process(
|
||
|
shlex.split(self.command),
|
||
|
cwd=self.workdir,
|
||
|
capture_stdout=True,
|
||
|
stderr=subprocess.STDOUT,
|
||
|
check=False,
|
||
|
)
|
||
|
self._writestatus(commit_id, f"Exit code: {build.returncode}\n".encode() + build.stdout)
|
||
|
# again: remove all ignored/untracked files and directories:
|
||
|
await self._run_git(["clean", "--force", "-d", "-x"])
|
||
|
_log.info(f"{self.name}: Built {commit_id} with exit status {build.returncode}")
|
||
|
|
||
|
async def update(self) -> None:
|
||
|
"""should only be called while holding lock"""
|
||
|
try:
|
||
|
with open(self._path_lastbuild, "rb") as lastbuild:
|
||
|
last_commit = lastbuild.readline().strip().decode()
|
||
|
except FileNotFoundError:
|
||
|
last_commit = "none"
|
||
|
try:
|
||
|
await self._update(last_commit)
|
||
|
except Exception as e:
|
||
|
_log.error(f"{self.name}: Failed {last_commit}: {e}")
|
||
|
self._writestatus(last_commit, str(e))
|
||
|
|
||
|
def check(self) -> tuple[int, bytes | str]:
|
||
|
if not os.path.isdir(self._path_gitdir):
|
||
|
return (500, "Missing .git directory")
|
||
|
|
||
|
lock = UnixFileLock(self._path_lockfile)
|
||
|
if lock.acquire():
|
||
|
JOB_QUEUE.queue(Job(repository=self, lock=lock))
|
||
|
else:
|
||
|
# tell current job to restart when finished:
|
||
|
with open(self._path_rebuild, "w"):
|
||
|
pass
|
||
|
|
||
|
# if current job finished before seeing our trigger,
|
||
|
# remove the trigger and just run it ourself
|
||
|
if lock.acquire():
|
||
|
try:
|
||
|
os.remove(self._path_rebuild)
|
||
|
except FileNotFoundError:
|
||
|
# something saw the trigger and handled it
|
||
|
# - no need to build again
|
||
|
pass
|
||
|
else:
|
||
|
JOB_QUEUE.queue(Job(repository=self, lock=lock))
|
||
|
|
||
|
try:
|
||
|
with open(self._path_lastbuild) as f:
|
||
|
return (200, f.read())
|
||
|
except FileNotFoundError:
|
||
|
return (200, "never built yet")
|
||
|
|
||
|
|
||
|
@dataclasses.dataclass(slots=True, kw_only=True)
|
||
|
class Config:
|
||
|
repositories: dict[str, Repository]
|
||
|
address: str
|
||
|
port: int
|
||
|
basepath: str
|
||
|
admin_token: str
|
||
|
git_path: str
|
||
|
parallel_jobs: int
|
||
|
|
||
|
def handle(self, req_path: str, auth: str) -> tuple[int, bytes | str]:
|
||
|
url = urllib.parse.urlparse(req_path)
|
||
|
if auth.startswith("Basic "):
|
||
|
creds = base64.decodebytes(auth.removeprefix("Basic ").strip().encode())
|
||
|
token = creds.split(b":", maxsplit=1)[1].decode()
|
||
|
elif auth.startswith("Bearer "):
|
||
|
token = auth.removeprefix("Bearer ").strip()
|
||
|
else:
|
||
|
return (401, "Missing authentication")
|
||
|
name = url.path.removeprefix(self.basepath).strip("/")
|
||
|
if not name in self.repositories:
|
||
|
return (404, "Not found")
|
||
|
repo = self.repositories[name]
|
||
|
if (
|
||
|
not hmac.compare_digest(token, repo.token)
|
||
|
and not (self.admin_token and hmac.compare_digest(token, self.admin_token))
|
||
|
):
|
||
|
return (401, "Invalid token")
|
||
|
return repo.check()
|
||
|
|
||
|
|
||
|
def load_config(path: str) -> Config:
|
||
|
with open(path) as f:
|
||
|
data = yaml.safe_load(f)
|
||
|
|
||
|
assert isinstance(data, dict)
|
||
|
data_repositories = data.pop('repositories')
|
||
|
assert isinstance(data_repositories, dict)
|
||
|
|
||
|
address = data.pop("address", "127.0.0.1")
|
||
|
assert isinstance(address, str)
|
||
|
|
||
|
port = data.pop("port")
|
||
|
assert isinstance(port, int)
|
||
|
|
||
|
parallel_jobs = data.pop("parallel-jobs", 1)
|
||
|
assert isinstance(parallel_jobs, int)
|
||
|
|
||
|
basepath = data.pop("base-path", "/")
|
||
|
assert isinstance(basepath, str)
|
||
|
assert not basepath or basepath.startswith("/")
|
||
|
|
||
|
admin_token = data.pop("admin-token", "")
|
||
|
assert not admin_token or len(admin_token) >= 16
|
||
|
|
||
|
git_path = GIT = shutil.which("git")
|
||
|
if not git_path:
|
||
|
raise RuntimeError("Missing git binary")
|
||
|
|
||
|
config = Config(
|
||
|
repositories={},
|
||
|
address=address,
|
||
|
port=port,
|
||
|
basepath=basepath,
|
||
|
git_path=git_path,
|
||
|
admin_token=admin_token,
|
||
|
parallel_jobs=parallel_jobs,
|
||
|
)
|
||
|
for repo_name, repo_data in data_repositories.items():
|
||
|
workdir = repo_data.pop("workdir")
|
||
|
assert isinstance(workdir, str)
|
||
|
token = repo_data.pop("token")
|
||
|
assert isinstance(token, str) and len(token) >= 16
|
||
|
command = repo_data.pop("command")
|
||
|
assert isinstance(command, str) and command
|
||
|
config.repositories[repo_name] = Repository(name=repo_name, config=config, workdir=workdir, token=token, command=command)
|
||
|
|
||
|
return config
|
||
|
|
||
|
|
||
|
CONFIG: Config
|
||
|
|
||
|
|
||
|
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||
|
server_version = "BuildTrigger"
|
||
|
|
||
|
def do_POST(self) -> None:
|
||
|
status: int
|
||
|
body: bytes | str
|
||
|
|
||
|
auth = self.headers.get("Authorization", "")
|
||
|
try:
|
||
|
status, body = CONFIG.handle(self.path, auth)
|
||
|
except Exception as e:
|
||
|
status = 500
|
||
|
body = str(e)
|
||
|
traceback.print_exception(e)
|
||
|
if isinstance(body, str):
|
||
|
raw_body = body.encode()
|
||
|
else:
|
||
|
assert isinstance(body, bytes)
|
||
|
raw_body = body
|
||
|
self.send_response(status)
|
||
|
self.send_header("Cache-Control", "no-store")
|
||
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
||
|
self.send_header("Content-Length", str(len(raw_body)))
|
||
|
if status == 401:
|
||
|
self.send_header("WWW-Authenticate", "Basic realm=\"trigger\"")
|
||
|
self.end_headers()
|
||
|
self.wfile.write(raw_body)
|
||
|
|
||
|
do_GET = do_POST
|
||
|
|
||
|
|
||
|
def run():
|
||
|
server = http.server.HTTPServer((CONFIG.address, CONFIG.port), RequestHandler)
|
||
|
|
||
|
def shutdown(signum, frame) -> None:
|
||
|
_log.info("Shutdown")
|
||
|
server.shutdown()
|
||
|
JOB_QUEUE.stop()
|
||
|
|
||
|
signal.signal(signal.SIGINT, shutdown)
|
||
|
|
||
|
async def go() -> None:
|
||
|
async with trio.open_nursery() as nursery:
|
||
|
nursery.start_soon(lambda: trio.to_thread.run_sync(server.serve_forever))
|
||
|
nursery.start_soon(JOB_QUEUE.run)
|
||
|
|
||
|
trio.run(go)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
global CONFIG, JOB_QUEUE
|
||
|
import argparse
|
||
|
parser = argparse.ArgumentParser()
|
||
|
parser.add_argument('--config', required=True, help="Path to YAML config file")
|
||
|
args = parser.parse_args()
|
||
|
CONFIG = load_config(args.config)
|
||
|
JOB_QUEUE = JobQueue(parallel=CONFIG.parallel_jobs)
|
||
|
|
||
|
run()
|
||
|
|
||
|
|
||
|
main()
|