I have gotten the custom model working on its own, but I wanted to try and see if I could make a threshold view for vision tracking of some retroreflective tape. If there is a better solution for thresholding using the DepthAI pipeline I am all ears. When running the new pipeline with the script node, I get one frame and then the output window freezes. The only difference between this and the working example code is the addition of the Script node into the pipeline, it is completely possible that I have done something wrong there or in the linking of the pipeline.
This could also have something to do with cv2.waitKey(). The example code runs fine without that line and the video out works by showing the feed with detection boxes, however I get a completely black output when waitKey() is not used and only the single frame when it it used. Pressing a key does nothing to render the next frame.
from pathlib import Path
import sys
import cv2
import depthai as dai
import numpy as np
import time
import argparse
import json
import blobconverter
# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--model", help="Provide model name or model path for inference",
default='model/best_openvino_2022.1_6shave.blob', type=str)
parser.add_argument("-c", "--config", help="Provide config path for inference",
default='model/best.json', type=str)
args = parser.parse_args()
# parse config
configPath = Path(args.config)
if not configPath.exists():
raise ValueError("Path {} does not exist!".format(configPath))
with configPath.open() as f:
config = json.load(f)
nnConfig = config.get("nn_config", {})
# parse input shape
if "input_size" in nnConfig:
W, H = tuple(map(int, nnConfig.get("input_size").split('x')))
# extract metadata
metadata = nnConfig.get("NN_specific_metadata", {})
classes = metadata.get("classes", {})
coordinates = metadata.get("coordinates", {})
anchors = metadata.get("anchors", {})
anchorMasks = metadata.get("anchor_masks", {})
iouThreshold = metadata.get("iou_threshold", {})
confidenceThreshold = metadata.get("confidence_threshold", {})
print(metadata)
# parse labels
nnMappings = config.get("mappings", {})
labels = nnMappings.get("labels", {})
# get model path
nnPath = args.model
if not Path(nnPath).exists():
print("No blob found at {}. Looking into DepthAI model zoo.".format(nnPath))
nnPath = str(blobconverter.from_zoo(args.model, shaves = 6, zoo_type = "depthai", use_cache=True))
# sync outputs
syncNN = True
# Create pipeline
pipeline = dai.Pipeline()
pipeline.setOpenVINOVersion(dai.OpenVINO.Version.VERSION_2021_4)
# Define source and output
camRgb = pipeline.create(dai.node.ColorCamera)
detectionNetwork = pipeline.create(dai.node.YoloDetectionNetwork)
xoutRgb = pipeline.create(dai.node.XLinkOut)
#controlIn = pipeline.create(dai.node.XLinkIn)
nnOut = pipeline.create(dai.node.XLinkOut)
script = pipeline.create(dai.node.Script)
script.setScript("""
data = node.io['in'].get()
ptr = data.getData()
filtered_array = []
for i in range(269999):
value = ptr[i]
if value > 240:
filtered_array.append(255)
else:
filtered_array.append(0)
#retval, threshed = cv2.threshold(data, 240, 255, cv2.THRESH_BINARY)
#newFrame = np.array(threshed)
#data.setData(filtered_array)
node.io['out'].send(data)
""")
# Properties
camRgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P)
camRgb.setInterleaved(False)
camRgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
camRgb.setFps(60)
xoutRgb.setStreamName("rgb")
#controlIn.setStreamName("control")
nnOut.setStreamName("nn")
camRgb.setPreviewSize(512, 288)
# Network specific settings
detectionNetwork.setConfidenceThreshold(confidenceThreshold)
detectionNetwork.setNumClasses(classes)
detectionNetwork.setCoordinateSize(coordinates)
detectionNetwork.setAnchors(anchors)
detectionNetwork.setAnchorMasks(anchorMasks)
detectionNetwork.setIouThreshold(iouThreshold)
detectionNetwork.setBlobPath(nnPath)
detectionNetwork.setNumInferenceThreads(2)
detectionNetwork.input.setBlocking(False)
# Linking
camRgb.preview.link(script.inputs['in'])
script.outputs['out'].link(detectionNetwork.input)
detectionNetwork.passthrough.link(xoutRgb.input)
detectionNetwork.out.link(nnOut.input)
def frameNorm(frame, bbox):
normVals = np.full(len(bbox), frame.shape[0])
normVals[::2] = frame.shape[1]
return (np.clip(np.array(bbox), 0, 1) * normVals).astype(int)
def displayFrame(name, frame, detections, inDet):
color = (255, 0, 0)
for detection in detections:
bbox = frameNorm(frame, (detection.xmin, detection.ymin, detection.xmax, detection.ymax))
cv2.putText(frame, labels[detection.label], (bbox[0] + 10, bbox[1] + 20), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.putText(frame, f"{int(detection.confidence * 100)}%", (bbox[0] + 10, bbox[1] + 40), cv2.FONT_HERSHEY_TRIPLEX, 0.5, 255)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
xCoord = int((bbox[2] + bbox[0]) / 2)
yCoord = int((bbox[3] + bbox[1]) / 2)
cv2.circle(frame, (xCoord, yCoord), 10, color, 1)
cv2.imshow(name, frame)
#cv2.waitKey()
#retval, threshed = cv2.threshold(inRgb.getCvFrame(), 240, 255, cv2.THRESH_BINARY)
#newFrame = np.array(threshed)
#cv2.imshow('newFrame',newFrame)
# Connect to device and start pipeline
with dai.Device(pipeline) as device:
# Output queue will be used to get the rgb frames from the output defined above
qRgb = device.getOutputQueue(name="rgb", maxSize=4, blocking=False)
qDet = device.getOutputQueue(name="nn", maxSize=4, blocking=False)
while True:
inRgb = qRgb.get() # blocking call, will wait until a new data has arrived
inDet = qDet.get()
if inRgb is not None:
print(inRgb)
frame = inRgb.getCvFrame()
print(frame)
if inDet is not None:
detections = inDet.detections
if frame is not None:
displayFrame("rgb", frame, detections, inDet)
if cv2.waitKey(1) == ord('q'):
break