nndeploy.server.executor 源代码

# executor.py

import requests
import os
import logging
import json
import threading
from typing import Dict, Tuple, Any, List
from pathlib import Path
from urllib.parse import urlparse
from modelscope.hub.file_download import model_file_download

from nndeploy.dag import GraphRunner

[文档]class GraphExecutor: """Encapsulate nndeploy load and run logic"""
[文档] def __init__(self, resources, cache_type=False, cache_size=None): self.runner = GraphRunner() self.resources = resources
[文档] def interrupt_running(self): self.runner.cancel_running()
[文档] def execute(self, graph_json: Dict, task_id: str) -> Tuple[Dict, float]: name = graph_json.get("name_") if isinstance(graph_json, (dict, list)): graph_json = json.dumps(graph_json, ensure_ascii=False) return self.runner.run(graph_json, name, task_id)
[文档] def handle_urls(self, graph_json: Dict) -> Dict[str, str]: url_fields = { "image_url_": "images", "video_url_": "videos", "audio_url_": "audios", "model_url_": "models", "other_url_": "others" } result_paths = {} for field, subdir in url_fields.items(): if field not in graph_json: continue url_list = graph_json[field] if not isinstance(url_list, list): logging.warning(f"{field} is not a list") continue if not url_list: logging.info(f"{field} is an empty list, skipping.") continue for idx, url in enumerate(url_list): if not isinstance(url, str): logging.warning(f"{field}.{idx} is not a string.") continue try: type_, uri = url.split("@", 1) except ValueError: logging.warning(f"Invalid URL format in {field}.{idx}: {url}") continue save_dir = Path(self.resources) / subdir save_dir.mkdir(parents=True, exist_ok=True) local_path = None # === Handle modelscope === if type_ == "modelscope": try: model_id, file_path = uri.split(":", 1) local_path = model_file_download( model_id=model_id, file_path=file_path, local_dir=save_dir ) except Exception as e: logging.error(f"Failed to download modelscope model for {field}.{idx}: {e}") continue # === Handle http/https === elif type_ in ("http", "https"): try: parsed = urlparse(uri) filename = os.path.basename(parsed.path) or "downloaded_file" save_path = save_dir / filename if not save_path.exists(): logging.info(f"Downloading {uri} to {save_path}") r = requests.get(uri, stream=True, timeout=10) r.raise_for_status() with open(save_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) logging.info(f"Saved to {save_path}") else: logging.info(f"File already exists: {save_path}") local_path = save_path except Exception as e: logging.error(f"Failed to download http resource for {field}.{idx}: {e}") continue elif type_.startswith("template"): logging.info(f"Skip processing template resource for {field}.{idx}") continue else: logging.warning(f"Unsupported or failed to download resource for {field}.{idx} with type '{type_}'") # === Store result === if local_path: result_paths[f"{field}.{idx}"] = str(local_path) return result_paths