nndeploy.server.frontend 源代码

from __future__ import annotations

import argparse
import logging
import sys
import tempfile
import zipfile
import re
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from typing import Optional, TypedDict, Tuple
from urllib.parse import urlsplit, urlunsplit

import requests
from typing_extensions import NotRequired

# ---------------------------- Constants ----------------------------

REQUEST_TIMEOUT = 60  # seconds
FRONTEND_ROOT = Path.cwd() / "frontend"

# Used when caller passes the default bang-string "!" (auto resolve via config)
DEFAULT_PROVIDER = ("nndeploy", "nndeploy_frontend", "v1.4.0")
DEFAULT_VERSION_STRING = "!"

GITHUB_HOST = "github.com"
GITEE_HOST = "gitee.com"
GITHUB_API_BASE = "https://api.github.com"


# ---------------------------- Typed dicts ----------------------------

[文档]class Asset(TypedDict): name: str browser_download_url: str
[文档]class Release(TypedDict): tag_name: str prerelease: bool assets: NotRequired[list[Asset]]
# ---------------------------- Provider ----------------------------
[文档]@dataclass class FrontEndProvider: owner: str repo: str @property def _base(self) -> str: return f"{GITHUB_API_BASE}/repos/{self.owner}/{self.repo}/releases" @cached_property def latest(self) -> Release: r = requests.get(f"{self._base}/latest", timeout=REQUEST_TIMEOUT) if r.status_code == 404: # Some repos don't expose /latest; fall back to the releases list r = requests.get(self._base, timeout=REQUEST_TIMEOUT) r.raise_for_status() releases: list[Release] = r.json() if not releases: raise RuntimeError("no releases found") return releases[0] r.raise_for_status() return r.json()
[文档] def by_tag(self, ver: str) -> Release: r = requests.get(f"{self._base}/tags/{ver}", timeout=REQUEST_TIMEOUT) if r.status_code == 404: # Fallback: iterate all releases and match tag_name r = requests.get(self._base, timeout=REQUEST_TIMEOUT) r.raise_for_status() for rel in r.json(): if rel["tag_name"] in (ver, f"v{ver}"): return rel raise RuntimeError(f"Release {ver} not found") r.raise_for_status() return r.json()
# ---------------------------- Download helpers ---------------------------- def _swap_host(url: str, new_host: str) -> str: parts = urlsplit(url) return urlunsplit((parts.scheme, new_host, parts.path, parts.query, parts.fragment)) def _stream_download(url: str, dest_dir: Path) -> None: with tempfile.TemporaryFile() as tmp: r = requests.get(url, stream=True, timeout=REQUEST_TIMEOUT) r.raise_for_status() for chunk in r.iter_content(8192): if chunk: tmp.write(chunk) tmp.seek(0) with zipfile.ZipFile(tmp) as zf: zf.extractall(dest_dir) def _download_with_fallbacks(urls: list[str], dest: Path) -> None: last_err: Optional[Exception] = None for u in urls: try: logging.info("download %s%s", u, dest) _stream_download(u, dest) return except Exception as e: logging.warning("download failed: %s", e) last_err = e raise RuntimeError(f"all download attempts failed; last error: {last_err}") def _download_via_release(rel: Release, dest: Path, asset_name: str = "dist.zip") -> None: """ Download via GitHub API release object. Priority: exact 'asset_name' → single .zip asset → error. """ assets = rel.get("assets", []) or [] asset = next((a for a in assets if a["name"] == asset_name), None) if asset is None: zips = [a for a in assets if a["name"].lower().endswith(".zip")] if len(zips) == 1: asset = zips[0] else: names = [a["name"] for a in assets] raise RuntimeError(f"asset '{asset_name}' not found; available assets: {names}") browser_url = asset["browser_download_url"] try_urls = [] try: try_urls.append(_swap_host(browser_url, GITEE_HOST)) except Exception: pass try_urls.append(browser_url) _download_with_fallbacks(try_urls, dest) # ---------------------------- Config + version utils ---------------------------- def _semver_tuple(v: str) -> Tuple[int, int, int]: """ Parse 'vX.Y.Z[-suffix]' into (X, Y, Z); non-parsable parts become 0. """ v = v.strip().lstrip("v") core = re.split(r"[-+]", v, 1)[0] parts = core.split(".") out = [] for i in range(3): try: out.append(int(parts[i])) except Exception: out.append(0) return tuple(out) # type: ignore[return-value] def _cmp_ver(a: str, b: str) -> int: ta, tb = _semver_tuple(a), _semver_tuple(b) return (ta > tb) - (ta < tb) def _match_constraint(nndeploy_ver: str, expr: str) -> bool: """ Support constraint expressions like '>=0.2.12,<0.3.0' or '>=0.3.0'. All comma-separated conditions must hold. """ nv = nndeploy_ver for cond in [c.strip() for c in expr.split(",") if c.strip()]: m = re.match(r"(>=|<=|>|<|==)?\s*v?(\d+\.\d+\.\d+)", cond) if not m: return False op, ver = m.groups() op = op or "==" cmpres = _cmp_ver(nv, ver) ok = { "==": cmpres == 0, ">=": cmpres >= 0, "<=": cmpres <= 0, ">": cmpres > 0, "<": cmpres < 0, }[op] if not ok: return False return True def _load_versions_config() -> dict: """ Import CONFIG from a sibling 'version.py'. If running inside a package (python -m pkg.module), a relative import is also attempted. """ CONFIG = None try: # Script-style import (same directory, not a package) from version import CONFIG as _CONF # type: ignore CONFIG = _CONF except Exception: try: # Package-style relative import from .version import CONFIG as _CONF # type: ignore CONFIG = _CONF except Exception as e: raise RuntimeError("version.py not found or import failed") from e if not isinstance(CONFIG, dict): raise RuntimeError("CONFIG not found or not a dict in version.py") if CONFIG.get("schema") != 1: raise RuntimeError("version CONFIG schema unsupported or missing (expect 1)") return CONFIG def _normalize_version(ver: Optional[str]) -> Optional[str]: """ Normalize nndeploy.__version__ into a plain 'X.Y.Z' string. Handles cases like 'nndeploy 2.6.1' → '2.6.1'. """ if not ver: return None parts = ver.strip().split() if len(parts) > 1 and re.match(r"\d+\.\d+\.\d+", parts[-1]): return parts[-1] return ver.strip() def _normalize_frontend_entry(x: object) -> dict: """ Normalize various frontend config shapes into {'tag': str, 'asset': Optional[str]}. Supported inputs: - "v1.4.0" - {"frontend": "v1.4.0", "asset": "..."} - {"frontend": {"tag": "v1.4.0", "asset": "..."}} - {"tag": "v1.4.0", "asset": "..."} """ if x is None: return {} if isinstance(x, str): return {"tag": x} if isinstance(x, dict): # Case: {"frontend": "v1.4.0", "asset": "..."} if isinstance(x.get("frontend"), str): return {"tag": x["frontend"], "asset": x.get("asset")} # Case: {"frontend": {"tag": "...", "asset": "..."}} (+ optional top-level asset) if isinstance(x.get("frontend"), dict): fe = x["frontend"] return {"tag": fe.get("tag") or fe.get("frontend"), "asset": fe.get("asset") or x.get("asset")} # Case: already normalized child: {"tag": "...", "asset": "..."} if "tag" in x or "asset" in x: return {"tag": x.get("tag"), "asset": x.get("asset")} return {} def _resolve_frontend_from_config() -> tuple[str, str, str, str]: """ Return (owner, repo, tag, asset) for the frontend bundle. Resolution order: exact versions → range rules → fallback. """ cfg = _load_versions_config() # Provider (no asset here). Allow either flat default_provider or sectioned default_provider["frontend"]. defprov = (cfg.get("default_provider") or {}).get("frontend") or cfg.get("default_provider") or {} owner = defprov.get("owner") or DEFAULT_PROVIDER[0] repo = defprov.get("repo") or DEFAULT_PROVIDER[1] # Resolve nndeploy version (normalize if it contains a prefix). nndeploy_ver: Optional[str] = None try: import nndeploy # type: ignore raw_ver = getattr(nndeploy, "__version__", None) nndeploy_ver = _normalize_version(raw_ver) except Exception: pass chosen: dict = {} if nndeploy_ver: # Exact table: CONFIG["versions"][nndeploy_ver] vermap = cfg.get("versions", {}) or {} hit = vermap.get(nndeploy_ver) if hit is not None: fe = hit.get("frontend", hit) if isinstance(hit, dict) else hit chosen = _normalize_frontend_entry(fe) # Range rules: first matching rule wins if not chosen: for r in cfg.get("ranges", []) or []: expr = r.get("nndeploy") if expr and _match_constraint(nndeploy_ver, expr): fe = r.get("frontend", r) chosen = _normalize_frontend_entry(fe) if chosen: break # Fallback block if not chosen: fb = cfg.get("fallback") or {} fe = fb.get("frontend", fb) chosen = _normalize_frontend_entry(fe) tag = chosen.get("tag") or chosen.get("frontend") or DEFAULT_PROVIDER[2] asset = chosen.get("asset") or "dist.zip" return owner, repo, str(tag), str(asset) # ---------------------------- Public manager ----------------------------
[文档]class FrontendManager: """ Resolve and download the correct frontend bundle (dist.zip or a configured asset) based on the running nndeploy version and the mapping in version.py::CONFIG. """ VERSION_RE = re.compile(r"^([\w-]+)/([\w_.-]+)@(v?\d+\.\d+\.\d+|latest)$")
[文档] @classmethod def init_frontend(cls, version_string: str = DEFAULT_VERSION_STRING) -> str: """ Entry point. Creates cache root and dispatches to implementation. - version_string = "!" → auto resolve via CONFIG and nndeploy.__version__ - version_string = "owner/repo@vX.Y.Z" → direct fetch of that release tag """ FRONTEND_ROOT.mkdir(parents=True, exist_ok=True) try: return cls._impl(version_string) except Exception as exc: logging.error("init front end failed: %s", exc, exc_info=True) sys.exit(1)
@classmethod def _impl(cls, ver_str: str) -> str: # Resolve owner/repo/tag/asset either from CONFIG ("!") or explicit "owner/repo@tag" if ver_str == DEFAULT_VERSION_STRING: owner, repo, tag, asset = _resolve_frontend_from_config() else: m = cls.VERSION_RE.match(ver_str) if m is None: raise argparse.ArgumentTypeError(f"illegal string: {ver_str}") owner, repo, tag = m.groups() asset = "dist.zip" # default asset when caller specifies an explicit tag # Direct download path for non-latest tags if tag != "latest": dest = FRONTEND_ROOT / f"{owner}_{repo}" / tag.lstrip("v") dist_dir = dest / "dist" if dist_dir.exists(): logging.info("use cached front end: %s", dist_dir) return str(dest) gitee_url = f"https://{GITEE_HOST}/{owner}/{repo}/releases/download/{tag}/{asset}" github_url = f"https://{GITHUB_HOST}/{owner}/{repo}/releases/download/{tag}/{asset}" dest.mkdir(parents=True, exist_ok=True) try: _download_with_fallbacks([gitee_url, github_url], dest) return str(dest) except Exception as err: # Best-effort cleanup for empty dest on failure try: dest.rmdir() except Exception as e: logging.debug("cleanup empty dir failed: %s", e) logging.warning("direct download attempts failed: %s, rolling up to API", err) # API fallback (latest or when direct download failed) provider = FrontEndProvider(owner, repo) rel = provider.latest if tag == "latest" else provider.by_tag(tag) semver = rel["tag_name"].lstrip("v") dest = FRONTEND_ROOT / f"{owner}_{repo}" / semver if not dest.exists(): logging.info("API download front end assets %s/%s@%s%s", owner, repo, semver, dest) dest.mkdir(parents=True, exist_ok=True) _download_via_release(rel, dest, asset_name=asset) return str(dest)
[文档] @staticmethod def templates_path() -> Optional[str]: return None
[文档] @staticmethod def embedded_docs_path() -> Optional[str]: return None