Zum Inhalt

API Reference

DownloadError

Bases: Exception

Raised when a download request fails (HTTP error, server error, or network issue).

Source code in src/form_download_helper/downloader.py
class DownloadError(Exception):
    """Raised when a download request fails (HTTP error, server error, or network issue)."""
    pass

DownloadOZG

Source code in src/form_download_helper/downloader.py
class DownloadOZG:
    def __init__(self, path, uri, username, password, version, package=None):
        self.path = path
        self.uri = uri
        self.auth = HTTPBasicAuth(username, password)
        self.version = version
        self.is_dev = str(version) == "dev"
        self.package = package or derive_package_name(Path(path))

    def _fetch(self, url):
        """Centralized HTTP GET with logging, error checking, and network handling.

        Raises DownloadError on HTTP errors, server-side error responses, or
        network failures.
        """
        console.print(f"  GET {url}")
        try:
            response = requests.get(url, auth=self.auth)
        except requests.RequestException as exc:
            raise DownloadError(f"Network error: {exc}") from exc

        if response.status_code != 200:
            raise DownloadError(
                f"HTTP {response.status_code} for {url}"
            )

        text = response.text
        # Detect server-side error bodies like {"error_type": "ArchivistRetrieveError"}
        try:
            body = json.loads(text)
            if isinstance(body, dict) and "error_type" in body:
                detail = body.get("detail", "no details")
                raise DownloadError(
                    f"{body['error_type']} - {detail}"
                )
        except (json.JSONDecodeError, ValueError):
            pass  # Not JSON — that's fine (e.g. HTML templates)

        return text

    def _get_schema(self, base_url):
        """Download meta-schema JSON. Uses versioned endpoint or live fallback."""
        if self.is_dev:
            return self._fetch(f"{base_url}/@@meta-schema")
        return self._fetch(
            f"{base_url}/@@form-version-view?version={self.version}"
        )

    def _get_ui_schema(self, base_url):
        """Download UI schema JSON. Uses versioned endpoint or live fallback."""
        if self.is_dev:
            return self._fetch(f"{base_url}/@@simple-ui-schema-view")
        return self._fetch(
            f"{base_url}/@@uvcx-version-view?version={self.version}&schema=simple"
        )

    def _get_client_ui_schema(self, base_url):
        """Download client UI schema JSON. Uses versioned endpoint or live fallback."""
        if self.is_dev:
            return self._fetch(f"{base_url}/@@ui-schema-view")
        return self._fetch(
            f"{base_url}/@@version-view?version={self.version}&schema=ui"
        )

    def _write(self, path, content):
        """Write file and track it for summary output."""
        Path(path).write_text(content, encoding="utf-8")
        self._written.append(str(path))
        console.print(f"    + {Path(path).name}")

    def get(self):
        self._written = []
        self._skipped = []
        self._errors = []

        console.print(
            f"Downloading from {self.uri} (version={self.version})"
        )

        # Schema is critical — abort on failure
        try:
            jsonschema = self._get_schema(self.uri)
        except DownloadError as exc:
            console.print(f"[bold red]FATAL: Server error for schema: {exc}[/bold red]")
            console.print("[bold red]Cannot proceed without schema. Aborting.[/bold red]")
            raise SystemExit(1)

        js_schema = json.loads(jsonschema)
        form_name = resolve_form_name(js_schema, self.uri)
        formatted = format_version(self.version)
        schema_dir_path = Path(self.path) / form_name
        form_dir_path = schema_dir_path / formatted
        form_dir_path.mkdir(parents=True, exist_ok=True)
        is_wizard = js_schema.get("type") == "wizard"

        # UI schema — non-critical
        try:
            ui_schema = self._get_ui_schema(self.uri)
        except DownloadError as exc:
            console.print(f"[yellow]WARNING: ui_schema failed: {exc}[/yellow]")
            ui_schema = "{}"
            self._errors.append("ui_schema")

        # Client UI schema — non-critical
        try:
            client_ui_schema = self._get_client_ui_schema(self.uri)
        except DownloadError as exc:
            console.print(f"[yellow]WARNING: client_ui_schema failed: {exc}[/yellow]")
            client_ui_schema = "{}"
            self._errors.append("client_ui_schema")

        self._write(form_dir_path / "schema.json", jsonschema)
        self._write(form_dir_path / "ui_schema.json", ui_schema)
        self._write(form_dir_path / "client_ui_schema.json", client_ui_schema)
        if is_wizard:
            self._write(form_dir_path / "template.pt", WIZARD_TEMPLATE_PT)
            self._write(form_dir_path / "wizard_template.pt", WIZARD_MODE_TEMPLATE_PT)
        else:
            # Template — non-critical
            try:
                if self.is_dev:
                    deform = self._fetch(f"{self.uri}/@@cpt-generator")
                else:
                    deform = self._fetch(
                        f"{self.uri}/@@cpt-generator?version={self.version}"
                    )
            except DownloadError as exc:
                console.print(f"[yellow]WARNING: template failed: {exc}[/yellow]")
                deform = "{}"
                self._errors.append("template")
            self._write(form_dir_path / "template.pt", deform)

        # Write form.py only if it doesn't exist (preserve user customizations)
        form_py_path = form_dir_path.parent / "form.py"
        py_version = "0.1" if self.is_dev else self.version
        if form_py_path.exists():
            self._skipped.append(str(form_py_path))
        else:
            if is_wizard:
                self._write(
                    form_py_path,
                    WIZARD_TEMPLATE.format(
                        doc_type=form_name, version=py_version, package=self.package
                    ),
                )
            else:
                self._write(
                    form_py_path,
                    FORM_TEMPLATE.format(doc_type=form_name, version=py_version),
                )

        self._write(form_dir_path / "__init__.py", "# Package")
        self._write(schema_dir_path / "__init__.py", "# Package")

        # Tests
        form_test_dir_path = form_dir_path / "tests"
        form_test_dir_path.mkdir(exist_ok=True)
        self._write(form_test_dir_path / "__init__.py", "# Package")
        self._write(
            form_test_dir_path / f"test_{form_name.lower()}.py",
            FORM_TEST_TEMPLATE,
        )
        self._write(
            form_test_dir_path / "test_acceptance.py",
            ACCEPTANCE_TEST_TEMPLATE.format(doc_type=form_name),
        )

        if is_wizard:
            self.download_wizard_steps(js_schema, form_dir_path)

        # Summary
        form_type = "wizard" if is_wizard else "form"
        parts = [
            f"[green]{form_name}[/green] ({form_type}, {formatted})",
            f"- [green]{len(self._written)} written[/green]",
        ]
        if self._skipped:
            parts.append(f"[yellow]{len(self._skipped)} skipped[/yellow]")
        if self._errors:
            parts.append(f"[red]{len(self._errors)} errors[/red]")
        rich.print(" ".join(parts))

    def download_wizard_steps(self, js_schema, form_dir_path):
        """Download all wizard steps."""
        steps = list(js_schema.get("schema", {}).get("properties", {}).keys())
        steps_dir = form_dir_path / "steps"
        steps_dir.mkdir(exist_ok=True)
        step_errors = []

        for step_name in steps:
            console.print(f"  Step: {step_name}")
            try:
                step_dir = steps_dir / step_name
                step_dir.mkdir(exist_ok=True)

                if self.is_dev:
                    self._download_step_dev(step_name, step_dir)
                else:
                    self._download_step_versioned(step_name, step_dir)

                self._write(step_dir / "__init__.py", "# Package")
            except (DownloadError, OSError) as e:
                step_errors.append(f"{step_name}: {e}")
                self._errors.append(f"step:{step_name}")

        rich.print(
            f"  [cyan]{len(steps)} steps[/cyan]"
            + (
                f", [red]{len(step_errors)} errors: {', '.join(step_errors)}[/red]"
                if step_errors
                else ""
            )
        )

    def _download_step_versioned(self, step_name, step_dir):
        """Download step data from the wizard's version snapshot."""
        base = (
            f"{self.uri}/@@uvcx-wizard-step-version"
            f"?version={self.version}&step={step_name}"
        )
        self._write(
            step_dir / "schema.json",
            self._fetch(f"{base}&type=meta_schema"),
        )
        self._write(
            step_dir / "ui_schema.json",
            self._fetch(f"{base}&type=simple_ui_schema"),
        )
        self._write(
            step_dir / "client_ui_schema.json",
            self._fetch(f"{base}&type=ui_schema"),
        )

        template = self._fetch(f"{base}&type=template")
        if not template.strip().startswith("{"):
            clean_template = "\n".join(
                line for line in template.splitlines() if "csrf_token" not in line
            )
            self._write(step_dir / "template.pt", clean_template)

    def _download_step_dev(self, step_name, step_dir):
        """Download step data from live child Form endpoints (dev mode)."""
        step_url = f"{self.uri}/{step_name}"
        self._write(step_dir / "schema.json", self._get_schema(step_url))
        self._write(step_dir / "ui_schema.json", self._get_ui_schema(step_url))
        self._write(
            step_dir / "client_ui_schema.json", self._get_client_ui_schema(step_url)
        )

        step_deform = self._fetch(f"{step_url}/@@cpt-generator")
        if not step_deform.strip().startswith("{"):
            clean_deform = "\n".join(
                line for line in step_deform.splitlines() if "csrf_token" not in line
            )
            self._write(step_dir / "template.pt", clean_deform)

download_wizard_steps(js_schema, form_dir_path)

Download all wizard steps.

Source code in src/form_download_helper/downloader.py
def download_wizard_steps(self, js_schema, form_dir_path):
    """Download all wizard steps."""
    steps = list(js_schema.get("schema", {}).get("properties", {}).keys())
    steps_dir = form_dir_path / "steps"
    steps_dir.mkdir(exist_ok=True)
    step_errors = []

    for step_name in steps:
        console.print(f"  Step: {step_name}")
        try:
            step_dir = steps_dir / step_name
            step_dir.mkdir(exist_ok=True)

            if self.is_dev:
                self._download_step_dev(step_name, step_dir)
            else:
                self._download_step_versioned(step_name, step_dir)

            self._write(step_dir / "__init__.py", "# Package")
        except (DownloadError, OSError) as e:
            step_errors.append(f"{step_name}: {e}")
            self._errors.append(f"step:{step_name}")

    rich.print(
        f"  [cyan]{len(steps)} steps[/cyan]"
        + (
            f", [red]{len(step_errors)} errors: {', '.join(step_errors)}[/red]"
            if step_errors
            else ""
        )
    )

derive_name_from_uri(uri)

Derive a form name from the URI slug when the server returns 'unknown_form'.

Takes the last path segment of the URI and converts hyphens to underscores to produce a valid Python-friendly directory/identifier name.

Example

https://example.de/community/guvh/anfrage-zahnarzt -> anfrage_zahnarzt

Source code in src/form_download_helper/naming.py
def derive_name_from_uri(uri: str) -> str:
    """Derive a form name from the URI slug when the server returns 'unknown_form'.

    Takes the last path segment of the URI and converts hyphens to underscores
    to produce a valid Python-friendly directory/identifier name.

    Example:
        https://example.de/community/guvh/anfrage-zahnarzt -> anfrage_zahnarzt
    """
    # Strip trailing slashes and take the last path segment
    slug = uri.rstrip("/").rsplit("/", 1)[-1]
    return slug.replace("-", "_")

derive_package_name(path)

Derive package name from path, using project root detection.

Resolves the path to absolute, finds the project root by looking for pyproject.toml, setup.py, or setup.cfg, then derives the package name from the relative path.

Examples:

/project/src/myapp/forms -> myapp.forms (if src/ exists) /project/myapp/forms -> myapp.forms (relative to project root)

Source code in src/form_download_helper/utils.py
def derive_package_name(path: Path) -> str:
    """Derive package name from path, using project root detection.

    Resolves the path to absolute, finds the project root by looking for
    pyproject.toml, setup.py, or setup.cfg, then derives the package name
    from the relative path.

    Examples:
        /project/src/myapp/forms -> myapp.forms (if src/ exists)
        /project/myapp/forms -> myapp.forms (relative to project root)
    """
    abs_path = path.resolve()
    project_root = find_project_root(abs_path)

    if project_root is None:
        # Fallback: use last two parts of the path
        parts = abs_path.parts
        return (
            ".".join(parts[-2:])
            if len(parts) >= 2
            else parts[-1]
            if parts
            else "package"
        )

    # Check if there's a src/ directory
    src_dir = project_root / "src"
    if src_dir.exists() and abs_path.is_relative_to(src_dir):
        rel_path = abs_path.relative_to(src_dir)
    elif abs_path.is_relative_to(project_root):
        rel_path = abs_path.relative_to(project_root)
    else:
        # Path is outside project, use last two parts
        parts = abs_path.parts
        return (
            ".".join(parts[-2:])
            if len(parts) >= 2
            else parts[-1]
            if parts
            else "package"
        )

    return ".".join(rel_path.parts)

download(path, username, password, version, uri, package=None)

Download form schemas from OZG server.

Source code in src/form_download_helper/__init__.py
def download(
    path: Path,
    username: str,
    password: str,
    version: str,
    uri: str,
    package: Optional[str] = None,
):
    """Download form schemas from OZG server."""
    ozg_sync = DownloadOZG(path, uri, username, password, version, package)
    ozg_sync.get()

ensure_valid_identifier(name)

Prefix names that start with a digit so they are valid Python identifiers.

Source code in src/form_download_helper/naming.py
def ensure_valid_identifier(name: str) -> str:
    """Prefix names that start with a digit so they are valid Python identifiers."""
    if name and name[0].isdigit():
        return f"F_{name}"
    return name

find_project_root(start_path)

Find project root by looking for pyproject.toml, setup.py, or setup.cfg.

Source code in src/form_download_helper/utils.py
def find_project_root(start_path: Path) -> Optional[Path]:
    """Find project root by looking for pyproject.toml, setup.py, or setup.cfg."""
    current = start_path.resolve()

    # Walk up the directory tree
    for parent in [current] + list(current.parents):
        for marker in PROJECT_ROOT_MARKERS:
            if (parent / marker).exists():
                return parent
    return None

resolve_form_name(js_schema, uri)

Return a usable form name, falling back to the URI slug if the server returned 'unknown_form'.

Source code in src/form_download_helper/naming.py
def resolve_form_name(js_schema: dict, uri: str) -> str:
    """Return a usable form name, falling back to the URI slug if the
    server returned 'unknown_form'."""
    name = js_schema.get("name", UNKNOWN_FORM_NAME)
    if name == UNKNOWN_FORM_NAME:
        derived = derive_name_from_uri(uri)
        rich.print(
            f"[bold yellow]WARNING![/bold yellow] Server returned '{UNKNOWN_FORM_NAME}' "
            f"as form name. Deriving name from URI: [cyan]{derived}[/cyan]"
        )
        return ensure_valid_identifier(derived)
    return ensure_valid_identifier(sanitize_name(name))

sanitize_name(name)

Replace hyphens with underscores so names are valid Python identifiers.

Source code in src/form_download_helper/naming.py
def sanitize_name(name: str) -> str:
    """Replace hyphens with underscores so names are valid Python identifiers."""
    return name.replace("-", "_")

Klassen

DownloadOZG

Die Hauptklasse für das Herunterladen von OZG-Formularen.

class DownloadOZG:
    def __init__(self, path, uri, username, password, version, package=None):
        """
        Initialisiert den OZG Downloader.

        Args:
            path (str): Zielverzeichnis für Downloads
            uri (str): URI des Formulars
            username (str): Benutzername für Authentication
            password (str): Passwort für Authentication
            version (int): Versionsnummer des Formulars
            package (str, optional): Python-Paketname. Wird automatisch
                aus dem Pfad abgeleitet, wenn nicht angegeben.
        """

Methoden

get()
def get(self):
    """
    Lädt alle Formular-Dateien herunter und erstellt die Verzeichnisstruktur.

    Downloads:
    - JSON Schema (@@meta-schema)
    - UI Schema (@@simple-ui-schema-view)
    - Page Template (@@cpt-generator) – nur für Nicht-Wizard-Formulare

    Generiert:
    - Python Formularklassen (form.py)
    - Test-Dateien (test_<formname>.py, test_acceptance.py)
    - Package-Struktur (__init__.py)

    Bei Wizard-Formularen werden zusätzlich die einzelnen
    Wizard-Steps heruntergeladen.
    """
download_wizard_steps()
def download_wizard_steps(self, js_schema, form_dir_path):
    """
    Lädt alle Wizard-Steps herunter.

    Für jeden Step werden schema.json, ui_schema.json und
    template.pt in ein eigenes Unterverzeichnis geschrieben.

    Args:
        js_schema (dict): Das geparste JSON-Schema des Wizards
        form_dir_path (Path): Pfad zum Versions-Verzeichnis
    """

Funktionen

download()

def download(
    path: Path,
    username: str,
    password: str,
    version: int,
    uri: str,
    package: Optional[str] = None,
):
    """
    Hauptfunktion für den Download-Prozess.

    Args:
        path (Path): Zielverzeichnis
        username (str): Benutzername
        password (str): Passwort
        version (int): Formular-Version
        uri (str): Formular-URI
        package (str, optional): Python-Paketname

    Example:
        >>> download(Path("/tmp"), "user", "pass", 2, "https://example.com/form")
    """

resolve_form_name()

def resolve_form_name(js_schema: dict, uri: str) -> str:
    """
    Ermittelt einen verwendbaren Formularnamen.

    Fällt auf den URI-Slug zurück, wenn der Server
    'unknown_form' als Namen liefert.

    Args:
        js_schema (dict): Das geparste JSON-Schema
        uri (str): Die Formular-URI

    Returns:
        str: Ein gültiger Formularname
    """

derive_name_from_uri()

def derive_name_from_uri(uri: str) -> str:
    """
    Leitet einen Formularnamen aus dem URI-Slug ab.

    Nimmt das letzte Pfad-Segment der URI und ersetzt
    Bindestriche durch Unterstriche.

    Example:
        >>> derive_name_from_uri("https://example.de/community/guvh/anfrage-zahnarzt")
        "anfrage_zahnarzt"
    """

ensure_valid_identifier()

def ensure_valid_identifier(name: str) -> str:
    """
    Stellt sicher, dass der Name ein gültiger Python-Identifier ist.

    Präfixiert Namen, die mit einer Ziffer beginnen, mit 'F_'.

    Example:
        >>> ensure_valid_identifier("90_form")
        "F_90_form"
    """

find_project_root()

def find_project_root(start_path: Path) -> Optional[Path]:
    """
    Findet das Projekt-Root anhand von Marker-Dateien
    (pyproject.toml, setup.py, setup.cfg).

    Args:
        start_path (Path): Startverzeichnis für die Suche

    Returns:
        Optional[Path]: Pfad zum Projekt-Root oder None
    """

derive_package_name()

def derive_package_name(path: Path) -> str:
    """
    Leitet den Python-Paketnamen aus dem Pfad ab.

    Erkennt automatisch src/-Layouts und berechnet den
    relativen Pfad zum Projekt-Root.

    Examples:
        /project/src/myapp/forms -> myapp.forms
        /project/myapp/forms -> myapp.forms
    """

format_version()

def format_version(version):
    """
    Formatiert Versionsnummer für Verzeichnisnamen.

    Args:
        version (int|float): Versionsnummer

    Returns:
        str: Formatierte Version (z.B. "v2_0")

    Example:
        >>> format_version(2)
        "v2_0"
        >>> format_version(2.5)
        "v2_5"
    """

main()

def main() -> int:
    """
    CLI Entry Point.

    Returns:
        int: Exit Code (0 bei Erfolg)
    """

Templates

FORM_TEMPLATE

Template für generierte Python-Formularklassen (Standard-Formulare):

FORM_TEMPLATE = """from ritter.browser import TemplateLoader
from uv.ozg.forms import DefaultDocumentEditForm as BaseOZGEditForm
from uvcreha.endpoints.browser.document import (
    DefaultDocumentEditForm as BaseREHAEditForm,
)
from uv.dynforms import dyn_form
from reha.prototypes.views.document import views

# ... Template-Inhalt
"""

WIZARD_TEMPLATE

Template für Wizard-Formulare.

WIZARD_TEMPLATE_PT / WIZARD_MODE_TEMPLATE_PT

Page-Templates für Wizard-Formulare (template.pt und wizard_template.pt).

FORM_TEST_TEMPLATE

Template für generierte Test-Dateien:

FORM_TEST_TEMPLATE = """def test_base(loaded_app):
    assert "%s" in loaded_app.document_stores["ozg"].keys()
"""

ACCEPTANCE_TEST_TEMPLATE

Template für Akzeptanz-Tests.

Konstanten

URI

Standard-URI für Schema-Abfragen:

URI = "https://formulare.uv-kooperation.de/ozg/rul/ozg-90/@@schema-view?version=1"

PROJECT_ROOT_MARKERS

Marker-Dateien zur Erkennung des Projekt-Roots:

PROJECT_ROOT_MARKERS = ("pyproject.toml", "setup.py", "setup.cfg")

HTTP Endpoints

Das Tool greift auf folgende Endpoints zu:

Endpoint Beschreibung Parameter
@@meta-schema JSON Schema des Formulars version
@@simple-ui-schema-view UI Schema des Formulars version
@@cpt-generator Page Template (Deform) version

Beispiel-Requests

# Schema abrufen
response = requests.get(
    f"{uri}/@@meta-schema?version={version}",
    auth=HTTPBasicAuth(username, password)
)

# UI Schema abrufen
response = requests.get(
    f"{uri}/@@simple-ui-schema-view?version={version}",
    auth=HTTPBasicAuth(username, password)
)

# Page Template abrufen
response = requests.get(
    f"{uri}/@@cpt-generator?version={version}",
    auth=HTTPBasicAuth(username, password)
)