nndeploy.codec.opencv 源代码


import nndeploy._nndeploy_internal as _C

import nndeploy.base
import nndeploy.device
import nndeploy.dag

import json

from PIL import Image
import numpy as np
from typing import List

import numpy as np
import cv2

[文档]class MakeNumpyGrid(nndeploy.dag.Node):
[文档] def __init__(self, name, inputs: [nndeploy.dag.Edge] = [], outputs: [nndeploy.dag.Edge] = []): super().__init__(name, inputs, outputs) self.set_key("nndeploy.codec.MakeNumpyGrid") self.set_desc("Concatenate multiple NumPy images (ndarray) into a grid") self.rows = 1 self.cols = 1 self.resize_h = -1 self.resize_w = -1 self.set_input_type(np.ndarray) # 动态输入:numpy.ndarray self.set_output_type(np.ndarray) self.set_dynamic_input(True)
def _ensure_3d(self, arr: np.ndarray) -> np.ndarray: """将 HxW 转为 HxWx1, 其他保持不变。""" if arr.ndim == 2: return arr[..., None] if arr.ndim == 3: return arr raise ValueError(f"输入数组维度应为2或3,得到 {arr.ndim}") def _resize_np(self, arr: np.ndarray, new_w: int, new_h: int) -> np.ndarray: """使用 cv2.resize 统一尺寸。""" arr3 = self._ensure_3d(arr) h, w, _ = arr3.shape interp = cv2.INTER_AREA if (new_w < w or new_h < h) else cv2.INTER_LINEAR out = cv2.resize(arr3, dsize=(new_w, new_h), interpolation=interp) if out.ndim == 2: # 单通道情况下,cv2 可能返回 HxW out = out[..., None] return out
[文档] def run(self) -> bool: try: # 收集所有输入边的 ndarray arrays: List[np.ndarray] = [] for i in range(len(self.get_all_input())): edge = self.get_input(i) arr = edge.get(self) if arr is not None: if not isinstance(arr, np.ndarray): print(f"MakeNumpyGrid: 输入类型必须为 numpy.ndarray, 得到 {type(arr)}") return nndeploy.base.Status.error() arrays.append(arr) expected = self.rows * self.cols if len(arrays) != expected: print(f"MakeNumpyGrid: 输入图像数量({len(arrays)})与网格(rows*cols={expected})不符") return nndeploy.base.Status.error() # 可选 resize,否则确保尺寸/通道一致 if self.resize_h != -1 and self.resize_w != -1: arrays = [self._resize_np(a, self.resize_w, self.resize_h) for a in arrays] else: arrays = [self._ensure_3d(a) for a in arrays] # 校验尺寸一致 h0, w0, c0 = arrays[0].shape for idx, a in enumerate(arrays): if a.shape != (h0, w0, c0): print(f"MakeNumpyGrid: 第 {idx} 个输入尺寸不一致,得到 {a.shape},期望 {(h0, w0, c0)}") return nndeploy.base.Status.error() # 预分配输出 (rows*h, cols*w, C) grid_h = self.rows * h0 grid_w = self.cols * w0 out = np.zeros((grid_h, grid_w, c0), dtype=arrays[0].dtype) # 逐格拷贝 for i, a in enumerate(arrays): r = i // self.cols c = i % self.cols y0, y1 = r * h0, (r + 1) * h0 x0, x1 = c * w0, (c + 1) * w0 out[y0:y1, x0:x1, :] = a # 若是单通道,输出为 HxW if c0 == 1: out = out[..., 0] # 输出到输出边 output_edge = self.get_output(0) output_edge.set(out) return nndeploy.base.Status.ok() except Exception as e: print(f"MakeNumpyGrid 节点运行失败: {e}") return nndeploy.base.Status.error()
[文档] def serialize(self): self.add_required_param("rows") self.add_required_param("cols") json_str = super().serialize() json_obj = json.loads(json_str) json_obj["rows"] = self.rows json_obj["cols"] = self.cols json_obj["resize_h"] = self.resize_h json_obj["resize_w"] = self.resize_w return json.dumps(json_obj, ensure_ascii=False, indent=2)
[文档] def deserialize(self, target: str): try: json_obj = json.loads(target) if "rows" in json_obj: self.rows = int(json_obj["rows"]) if "cols" in json_obj: self.cols = int(json_obj["cols"]) if "resize_h" in json_obj: self.resize_h = int(json_obj["resize_h"]) if "resize_w" in json_obj: self.resize_w = int(json_obj["resize_w"]) return super().deserialize(target) except Exception as e: print(f"MakeNumpyGrid 节点反序列化失败: {e}") return nndeploy.base.Status.error()
[文档]class MakeNumpyGridCreator(nndeploy.dag.NodeCreator):
[文档] def __init__(self): super().__init__()
[文档] def create_node(self, name: str, inputs: list[nndeploy.dag.Edge], outputs: list[nndeploy.dag.Edge]): self.node = MakeNumpyGrid(name, inputs, outputs) return self.node
make_numpy_grid_node_creator = MakeNumpyGridCreator() nndeploy.dag.register_node("nndeploy.codec.MakeNumpyGrid", make_numpy_grid_node_creator)