nndeploy.image.canny 源代码

import numpy as np
import cv2
from PIL import Image
import nndeploy.dag

# Canny edge detection node with numpy input/output
[文档]class CannyNumpy(nndeploy.dag.Node): """ Canny edge detection node (numpy input/output) Input: - np.ndarray, image in BGR format Output: - np.ndarray, single-channel or multi-channel binary edge map """
[文档] def __init__(self, name, inputs: list[nndeploy.dag.Edge] = None, outputs: list[nndeploy.dag.Edge] = None): super().__init__(name, inputs, outputs) super().set_key("nndeploy.image.CannyNumpy") super().set_desc("Canny edge detection (numpy input/output)") self.set_input_type(np.ndarray) self.set_output_type(np.ndarray) self.low_threshold = 100 self.high_threshold = 200 # New parameter: number of output channels, 1 for single-channel, 3 for RGB, 4 for RGBA self.return_num_channel = 3
[文档] def run(self): input_edge = self.get_input(0) image = input_edge.get(self) if image is None: return nndeploy.base.Status.err("Input cannot be None") if image.ndim == 3 and image.shape[2] == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image edges = cv2.Canny(gray, self.low_threshold, self.high_threshold) # Check if multi-channel output is needed if self.return_num_channel > 1: edges = np.stack([edges]*self.return_num_channel, axis=2) output_edge = self.get_output(0) output_edge.set(edges) return nndeploy.base.Status.ok()
[文档] def serialize(self): import json json_str = super().serialize() json_obj = json.loads(json_str) json_obj["low_threshold"] = self.low_threshold json_obj["high_threshold"] = self.high_threshold json_obj["return_num_channel"] = self.return_num_channel return json.dumps(json_obj)
[文档] def deserialize(self, target: str): import json json_obj = json.loads(target) self.low_threshold = json_obj.get("low_threshold", 100) self.high_threshold = json_obj.get("high_threshold", 200) self.return_num_channel = json_obj.get("return_num_channel", 3) return super().deserialize(target)
[文档]class CannyNumpyCreator(nndeploy.dag.NodeCreator):
[文档] def __init__(self): super().__init__()
[文档] def create_node(self, name: str, inputs: list[nndeploy.dag.Edge], outputs: list[nndeploy.dag.Edge]): self.node = CannyNumpy(name, inputs, outputs) return self.node
canny_edge_numpy_creator = CannyNumpyCreator() nndeploy.dag.register_node("nndeploy.image.CannyNumpy", canny_edge_numpy_creator) # Canny edge detection node with PIL input/output
[文档]class CannyPIL(nndeploy.dag.Node): """ Canny edge detection node (PIL input/output) Input: - PIL.Image.Image, image in RGB format Output: - PIL.Image.Image, single-channel or multi-channel binary edge map """
[文档] def __init__(self, name, inputs: list[nndeploy.dag.Edge] = None, outputs: list[nndeploy.dag.Edge] = None): super().__init__(name, inputs, outputs) super().set_key("nndeploy.image.CannyPIL") super().set_desc("Canny edge detection (PIL input/output)") self.set_input_type(Image.Image) self.set_output_type(Image.Image) self.low_threshold = 100 self.high_threshold = 200 # New parameter: number of output channels, 1 for single-channel, 3 for RGB, 4 for RGBA self.return_num_channel = 3
[文档] def run(self): input_edge = self.get_input(0) pil_image = input_edge.get(self) if pil_image is None: return nndeploy.base.Status.err("Input cannot be None") if not isinstance(pil_image, Image.Image): return nndeploy.base.Status.err("Input must be of type PIL.Image.Image") image = np.array(pil_image) if image.ndim == 3 and image.shape[2] == 3: gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) else: gray = image edges = cv2.Canny(gray, self.low_threshold, self.high_threshold) # Check if multi-channel output is needed if self.return_num_channel == 3: edges = np.stack([edges]*3, axis=2) pil_edges = Image.fromarray(edges, mode="RGB") elif self.return_num_channel == 4: edges = np.stack([edges]*4, axis=2) pil_edges = Image.fromarray(edges, mode="RGBA") else: pil_edges = Image.fromarray(edges) output_edge = self.get_output(0) output_edge.set(pil_edges) return nndeploy.base.Status.ok()
[文档] def serialize(self): import json json_str = super().serialize() json_obj = json.loads(json_str) json_obj["low_threshold"] = self.low_threshold json_obj["high_threshold"] = self.high_threshold json_obj["return_num_channel"] = self.return_num_channel return json.dumps(json_obj)
[文档] def deserialize(self, target: str): import json json_obj = json.loads(target) self.low_threshold = json_obj.get("low_threshold", 100) self.high_threshold = json_obj.get("high_threshold", 200) self.return_num_channel = json_obj.get("return_num_channel", 3) return super().deserialize(target)
[文档]class CannyPILCreator(nndeploy.dag.NodeCreator):
[文档] def __init__(self): super().__init__()
[文档] def create_node(self, name: str, inputs: list[nndeploy.dag.Edge], outputs: list[nndeploy.dag.Edge]): self.node = CannyPIL(name, inputs, outputs) return self.node
canny_edge_pil_creator = CannyPILCreator() nndeploy.dag.register_node("nndeploy.image.CannyPIL", canny_edge_pil_creator)