# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db scopes."""
import enum
import re
from typing import Any, Self, TYPE_CHECKING, TypeAlias, cast
import pgtrigger
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db import models
from django.db.models import (
CheckConstraint,
Exists,
F,
OuterRef,
Q,
QuerySet,
UniqueConstraint,
)
from debusine.db.models import permissions
from debusine.db.models.files import File, FileStore
from debusine.db.models.permissions import (
Allow,
Role,
permission_check,
permission_filter,
)
from debusine.db.permissioncontext import PermissionContext
if TYPE_CHECKING:
from django.http import HttpRequest
from django_stubs_ext.db.models import TypedModelMeta
from debusine.server.file_backend.interface import FileBackendInterface
from debusine.web.views.ui.scopes import ScopeUI
else:
TypedModelMeta = object
#: Scope names reserved for use in toplevel URL path components
RESERVED_SCOPE_NAMES = frozenset(
(
"accounts",
"admin",
"api",
"api-auth",
"artifact",
"task-status",
"user",
"workers",
"work-request",
"workspace",
)
)
#: Regexp matching the structure of scope names
scope_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")
def is_valid_scope_name(value: str) -> bool:
"""Check if value is a valid scope name."""
if value in RESERVED_SCOPE_NAMES:
return False
return bool(scope_name_regex.match(value))
def validate_scope_name(value: str) -> None:
"""Validate scope names."""
if not is_valid_scope_name(value):
raise ValidationError(
"%(value)r is not a valid scope name", params={"value": value}
)
class ScopeRoleBase(permissions.RoleBase):
"""Scope role implementation."""
implied_by_scope_roles: frozenset["ScopeRoles"]
def _setup(self) -> None:
"""Set up implications for a newly constructed role."""
implied_by_scope_roles: set[ScopeRoles] = {cast(ScopeRoles, self)}
for i in self.implied_by:
match i:
case Role():
# Resolve a role passed during class definition into its
# enum instance
role = self.__class__(i.value)
implied_by_scope_roles |= role.implied_by_scope_roles
case _:
raise ImproperlyConfigured(
f"Scope roles do not support implications by {type(i)}"
)
self.implied_by_scope_roles = frozenset(implied_by_scope_roles)
def q(self, pc: PermissionContext) -> Q:
"""Return a Q expression to select scopes with this role."""
return Q(
roles__group__in=pc.groups(),
roles__role__in=self.implied_by_scope_roles,
)
def implies(self, role: "ScopeRoles") -> bool:
"""Check if this role implies the given one."""
return self in role.implied_by_scope_roles
class ScopeRoles(permissions.Roles, ScopeRoleBase, enum.ReprEnum):
"""Available roles for a Scope."""
OWNER = Role(
"owner",
description="Full administrative access",
)
# Read-only access to the whole scope.
OBSERVER = Role(
"observer",
implied_by=[OWNER],
description="Read-only access to all workspaces",
)
# "viewer" is reserved for possibly adding private scopes in the future,
# in which case we may need a distinction between "can view the scope
# itself" and "can view all workspaces in the scope".
ScopeRoles.setup()
class ScopeQuerySet[A](QuerySet["Scope", A]):
"""Custom QuerySet for Scope."""
def with_role(self, pc: PermissionContext, role: ScopeRoles) -> Self:
"""Keep only resources where the user has the given role."""
if not pc.user.is_authenticated:
return self.none()
if pc.user.is_system:
return self
return self.filter(
Exists(self.model.objects.filter(role.q(pc), pk=OuterRef("pk")))
)
@permission_filter(work_request=Allow.PASS, anonymous=Allow.PASS)
def can_display(self, pc: PermissionContext) -> Self: # noqa: ARG002, U100
"""Keep only Scopes that can be displayed."""
return self
@permission_filter()
def can_create_workspace(self, pc: PermissionContext) -> Self:
"""Keep only Scopes where the user can create workspaces."""
return self.with_role(pc, Scope.Roles.OWNER)
class ScopeManager(models.Manager["Scope"]):
"""Manager for Scope model."""
def get_roles_model(self) -> type["ScopeRole"]:
"""Get the model used for role assignment."""
return ScopeRole
def get_queryset(self) -> ScopeQuerySet[Any]:
"""Use the custom QuerySet."""
return ScopeQuerySet(self.model, using=self._db)
[docs]
class Scope(models.Model):
"""
Scope model.
This is used to create different distinct sets of groups and workspaces
"""
Roles: TypeAlias = ScopeRoles
name = models.CharField(
max_length=255,
unique=True,
validators=[validate_scope_name],
help_text="internal name for the scope",
)
label = models.CharField(
max_length=255,
unique=True,
help_text="User-visible name for the scope",
)
icon = models.CharField(
max_length=255,
default="",
blank=True,
help_text=(
"Optional user-visible icon,"
" resolved via ``{% static %}`` in templates"
),
)
file_stores = models.ManyToManyField(
FileStore, related_name="scopes", through="db.FileStoreInScope"
)
objects = ScopeManager.from_queryset(ScopeQuerySet)()
class Meta(TypedModelMeta):
base_manager_name = "objects"
def __str__(self) -> str:
"""Return basic information of Scope."""
return self.name
[docs]
def ui(self, request: "HttpRequest") -> "ScopeUI":
"""Return a UI helper for this instance."""
from debusine.web.views.ui.scopes import ScopeUI
return ScopeUI.get(request, self)
[docs]
@permission_check(
"{user} cannot display scope {resource}",
work_request=Allow.PASS,
anonymous=Allow.PASS,
)
def can_display(self, pc: PermissionContext) -> bool: # noqa: ARG002, U100
"""Check if the scope can be displayed."""
return True
[docs]
@permission_check(
"{user} cannot create workspaces in {resource}",
)
def can_create_workspace(self, pc: PermissionContext) -> bool:
"""Check if the user can create workspaces in this scope."""
return self.has_role(pc, Scope.Roles.OWNER)
[docs]
def has_role(self, pc: PermissionContext, role: ScopeRoles) -> bool:
"""Check if the user has the given role on this Scope."""
from debusine.db.context import context
if not pc.user.is_authenticated:
return False
if pc.user.is_system:
return True
if context.pc_is(pc) and context.scope == self:
return any(r.implies(role) for r in context.scope_roles)
return Scope.objects.with_role(pc, role).filter(pk=self.pk).exists()
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs]
def get_group_roles(
self, pc: PermissionContext
) -> QuerySet["ScopeRole", str]:
"""Get the roles of the user on this scope."""
if not pc.user.is_authenticated:
return ScopeRole.objects.none().values_list("role", flat=True)
else:
return (
ScopeRole.objects.filter(resource=self, group__in=pc.groups())
.values_list("role", flat=True)
.distinct()
)
[docs]
def upload_file_stores(
self,
fileobj: File,
*,
enforce_soft_limits: bool = False,
include_write_only: bool = False,
) -> QuerySet[FileStore]:
"""
Find the file stores in this scope where `fileobj` can be uploaded.
The returned query set is in descending order of upload priority,
breaking ties by ascending file store IDs.
:param enforce_soft_limits: Enforce `soft_max_size` policies as well
as `max_size`.
:param include_write_only: Uploading a file to a write-only store
will mean that debusine won't download it from there again, so
that only makes sense in specialized situations such as populating
a backup store. Set this to True to include write-only stores in
the returned query set.
"""
file_stores = (
self.file_stores.annotate_current_total_size()
.exclude(filestoreinscope__read_only=True)
.exclude(max_size__lt=F("current_total_size") + fileobj.size)
.order_by(
F("filestoreinscope__upload_priority").desc(nulls_last=True),
"id",
)
)
if enforce_soft_limits:
# TODO: We should also enforce FileStoreInScope.soft_max_size.
file_stores = file_stores.exclude(
soft_max_size__lt=F("current_total_size") + fileobj.size
)
if not include_write_only:
file_stores = file_stores.exclude(filestoreinscope__write_only=True)
return file_stores
[docs]
def upload_file_backend(self, fileobj: File) -> "FileBackendInterface[Any]":
"""
Find the best file backend for uploading `fileobj`.
:raises IndexError: if there is no such file backend.
"""
return self.upload_file_stores(fileobj)[0].get_backend_object()
[docs]
def download_file_stores(self, fileobj: File) -> QuerySet[FileStore]:
"""
Find the file stores in this scope that have `fileobj`, if any.
The returned query set is in descending order of download priority,
breaking ties in descending order of upload priority and then by
ascending file store IDs.
"""
return (
self.file_stores.exclude(filestoreinscope__write_only=True)
.filter(files=fileobj)
.order_by(
F("filestoreinscope__download_priority").desc(nulls_last=True),
F("filestoreinscope__upload_priority").desc(nulls_last=True),
"id",
)
)
[docs]
def download_file_backend(
self, fileobj: File
) -> "FileBackendInterface[Any]":
"""
Find the best file backend for downloading `fileobj`.
:raises IndexError: if there is no such file backend.
"""
return self.download_file_stores(fileobj)[0].get_backend_object()
class ScopeRole(models.Model):
"""Role assignments for scopes."""
Roles: TypeAlias = ScopeRoles
resource = models.ForeignKey(
Scope,
on_delete=models.CASCADE,
related_name="roles",
)
group = models.ForeignKey(
"Group",
on_delete=models.CASCADE,
related_name="scope_roles",
)
role = models.CharField(choices=Roles.choices)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["resource", "group", "role"],
name="%(app_label)s_%(class)s_unique_resource_group_role",
),
]
def __str__(self) -> str:
"""Return a description of the role assignment."""
return f"{self.group}─{self.role}⟶{self.resource}"
[docs]
class FileStoreInScope(models.Model):
"""Database model used for extra data on Scope/FileStore relations."""
scope = models.ForeignKey(Scope, on_delete=models.PROTECT)
file_store = models.ForeignKey(FileStore, on_delete=models.PROTECT)
_file_store_instance_wide = models.BooleanField(
default=True,
editable=False,
db_column="file_store_instance_wide",
help_text="Synced from FileStore.instance_wide; do not update directly",
)
#: The priority of this store for the purpose of storing new files.
#: When adding a new file, debusine tries stores whose policies allow
#: adding new files in descending order of upload priority, counting
#: null as the lowest.
upload_priority = models.IntegerField(blank=True, null=True)
#: The priority of this store for the purpose of serving files to
#: clients. When downloading a file, debusine tries stores in
#: descending order of download priority, counting null as the lowest;
#: it breaks ties in descending order of upload priority, again counting
#: null as the lowest. If there is still a tie, it picks one of the
#: possibilities arbitrarily.
download_priority = models.IntegerField(blank=True, null=True)
#: If True, the storage maintenance job ensures that this store has a
#: copy of all files in the scope.
populate = models.BooleanField(default=False)
#: If True, the storage maintenance job moves all files in this scope to
#: some other store in the same scope, following the same rules for
#: finding a target store as for uploads of new files. It does not move
#: into a store if that would take its total size over `soft_max_size`
#: (either for the scope or the file store), and it logs an error if it
#: cannot find any eligible target store.
drain = models.BooleanField(default=False)
#: If this field is set, then constrain `drain` to use the store with
#: the given name in this scope.
#
# TODO: ruff is correct that we shouldn't use null=True on a TextField,
# but fixing that retroactively is non-trivial.
drain_to = models.TextField(blank=True, null=True) # noqa: DJ001
#: If True, debusine will not add new files to this store. Use this in
#: combination with `drain` to prepare for removing the file store.
read_only = models.BooleanField(default=False)
#: If True, debusine will not read files from this store. This is
#: suitable for provider storage classes that are designed for long-term
#: archival rather than routine retrieval, such as S3 Glacier Deep
#: Archive.
write_only = models.BooleanField(default=False)
#: An integer specifying the number of bytes that the file store can
#: hold for this scope (accounting files that are in multiple scopes to
#: all of the scopes in question). This limit may be exceeded
#: temporarily during uploads; the storage maintenance job will move the
#: least-recently-used files to another file store to get back below the
#: limit.
soft_max_size = models.IntegerField(blank=True, null=True)
class Meta(TypedModelMeta):
triggers = [
pgtrigger.Trigger(
name="db_filestoreinscope_sync_instance_wide",
operation=pgtrigger.Insert | pgtrigger.Update,
when=pgtrigger.Before,
func=" ".join(
"""
NEW.file_store_instance_wide =
(SELECT db_filestore.instance_wide
FROM db_filestore
WHERE db_filestore.id = NEW.file_store_id);
RETURN NEW;
""".split()
),
)
]
constraints = [
UniqueConstraint(
fields=["scope", "file_store"],
name="%(app_label)s_%(class)s_unique_scope_file_store",
),
UniqueConstraint(
fields=["file_store"],
name=(
"%(app_label)s_%(class)s_"
"unique_file_store_not_instance_wide"
),
condition=Q(_file_store_instance_wide=False),
),
# It does not make sense to request a store to be populated
# while also requesting it to be either drained or read-only.
CheckConstraint(
name="%(app_label)s_%(class)s_consistent_populate",
check=Q(populate=False) | Q(drain=False, read_only=False),
),
]
def __str__(self) -> str:
"""Return basic information of FileStoreInScope."""
return f"{self.scope}/{self.file_store.name}"