I am using 3 machine vision USB FLIR Camera (Blackfly S BFS-U3-31S4C) and 4 3D GigE Line scan camera C6-2040-GigE. When I am running frame capture script for machine vision camera, there are no errors coming.
But when I run both machine vision cameras and 3D Cameras, Sometimes Following error is coming in machine vision script and or it closes without error.
Server details: OS : Ubuntu Intel® Xeon(R) Gold 5220 CPU @ 2.20GHz × 72 Ram : 128 GB
Error :
python3: /vcpkg/installed/x64-linux/include/boost/smart_ptr/shared_ptr.hpp:786: typename boost::detail::sp_member_access<T>::type boost::shared_ptr<T>::operator->() const [with T = GenTL::DataStream; typename boost::detail::sp_member_access<T>::type = GenTL::DataStream*]: Assertion
px != 0' failed.
Aborted (core dumped)
`
Following is my machine vision camera code.
import os
import PySpin
import sys
import shutil
import time
import numpy as np
import cv2
import datetime
import pymysql
import subprocess
savePath1 = "/home/pwil-server/Insightzz/code/FrameCapture/"
#For logging error
import logging
import traceback
#Logging module
camlogger = None
SIDE2_DEVICE = "21236088"
TOP_DEVICE = "21266010"
BOTTOM_DEVICE = "21266009"
fps = 15.0
topExp = 100.00
bottomExp = 100.00
side2Exp = 400.00
def acquire_images(cam_list):
"""
This function acquires and saves 10 images from each device.
:param cam_list: List of cameras
:type cam_list: CameraList
:return: True if successful, False otherwise.
:rtype: bool
"""
print('*** IMAGE ACQUISITION ***\n')
try:
result = True
for i, cam in enumerate(cam_list):
camSerialNumber = getSerialNumber(cam)
if camSerialNumber == TOP_DEVICE or camSerialNumber == SIDE2_DEVICE or camSerialNumber == BOTTOM_DEVICE:
# Set acquisition mode to continuous
node_acquisition_mode = PySpin.CEnumerationPtr(cam.GetNodeMap().GetNode('AcquisitionMode'))
if not PySpin.IsAvailable(node_acquisition_mode) or not PySpin.IsWritable(node_acquisition_mode):
print('Unable to set acquisition mode to continuous (node retrieval; camera %d). Aborting... \n' % i)
return False
node_acquisition_mode_continuous = node_acquisition_mode.GetEntryByName('Continuous')
if not PySpin.IsAvailable(node_acquisition_mode_continuous) or not PySpin.IsReadable(
node_acquisition_mode_continuous):
print('Unable to set acquisition mode to continuous (entry \'continuous\' retrieval %d). \
Aborting... \n' % i)
return False
acquisition_mode_continuous = node_acquisition_mode_continuous.GetValue()
node_acquisition_mode.SetIntValue(acquisition_mode_continuous)
print('Camera %d acquisition mode set to continuous...' % i)
# Begin acquiring images
node_device_serial_number = PySpin.CStringPtr(cam.GetTLDeviceNodeMap().GetNode('DeviceSerialNumber'))
device_serial_number = node_device_serial_number.GetValue()
# for keeping rgb value constant
ptrRgbTransformLightSource = PySpin.CEnumerationPtr(cam.GetNodeMap().GetNode("RgbTransformLightSource"))
ptrRgbTransformationLightSourceWarm = ptrRgbTransformLightSource.GetEntryByName("CoolFluorescent4000K") #Daylight5000K #WarmFluorescent3000K #CoolFluorescent4000K
ptrRgbTransformLightSource.SetIntValue(ptrRgbTransformationLightSourceWarm.GetValue())
print(f"device_serial_number : {device_serial_number}")
print(device_serial_number)
if device_serial_number == SIDE2_DEVICE:
try:
cam.GainAuto.SetValue(PySpin.GainAuto_Off)
cam.Gain.SetValue(27)
except Exception as e:
pass
if not configure_exposure(cam, side2Exp):
return False
try:
cam.AcquisitionFrameRateAuto = "Off"
cam.AcquisitionFrameRateEnable.SetValue(True)
cam.AcquisitionFrameRate.SetValue(fps)
cam.AcquisitionFrameRate_set = fps
except Exception as e:
print(f"AcquisitionFrameRate SIDE2 : {e}")
camlogger.critical(f"AcquisitionFrameRate SIDE2 : {e}")
if device_serial_number == TOP_DEVICE:
try:
cam.GainAuto.SetValue(PySpin.GainAuto_Off)
cam.Gain.SetValue(25)
except Exception as e:
pass
if not configure_exposure(cam, topExp):
return False
try:
cam.AcquisitionFrameRateAuto = "Off"
cam.AcquisitionFrameRateEnable.SetValue(True)
cam.AcquisitionFrameRate.SetValue(fps)
cam.AcquisitionFrameRate_set = fps
except Exception as e:
print(f"AcquisitionFrameRate top : {e}")
camlogger.critical(f"AcquisitionFrameRate top : {e}")
if device_serial_number == BOTTOM_DEVICE:
try:
cam.GainAuto.SetValue(PySpin.GainAuto_Off)
cam.Gain.SetValue(25)
except Exception as e:
pass
if not configure_exposure(cam, bottomExp): #3500
return False
try:
cam.AcquisitionFrameRateAuto = "Off"
cam.AcquisitionFrameRateEnable.SetValue(True)
cam.AcquisitionFrameRate.SetValue(fps)
cam.AcquisitionFrameRate_set = fps
except Exception as e:
print(f"AcquisitionFrameRate bottom : {e}")
camlogger.critical(f"AcquisitionFrameRate bottom : {e}")
cam.BeginAcquisition()
print('Camera %d started acquiring images...' % i)
print()
side2_once1 = True
side2_once2 = True
top_once1 = True
top_once2 = True
bottom_once1 = True
bottom_once2 = True
cameras=[]
while True:
date=datetime.datetime.now()
try:
t = int(time.time()*1000)
for i, cam in enumerate(cam_list):
camSerialNumber = getSerialNumber(cam)
if camSerialNumber == TOP_DEVICE or camSerialNumber == SIDE2_DEVICE or camSerialNumber == BOTTOM_DEVICE:
# Retrieve device serial number for filename
node_device_serial_number = PySpin.CStringPtr(cam.GetTLDeviceNodeMap().GetNode('DeviceSerialNumber'))
if PySpin.IsAvailable(node_device_serial_number) and PySpin.IsReadable(node_device_serial_number):
device_serial_number = node_device_serial_number.GetValue()
# print('Camera %d serial number set to %s...' % (i, device_serial_number))
# Retrieve next received image and ensure image completion
image_result = cam.GetNextImage(1000)
if image_result.IsIncomplete():
print('Image incomplete with image status %d ... \n' % image_result.GetImageStatus())
else:
try:
# Print image information
width = image_result.GetWidth()
height = image_result.GetHeight()
#print('Camera %d grabbed image %d, width = %d, height = %d' % (i, n, width, height))
# Get image data as NumPy array
image_array = image_result.GetNDArray()
image_converted = cv2.cvtColor(image_array, cv2.COLOR_BAYER_BG2BGR)#COLOR_BayerBG2RGB
if device_serial_number == SIDE2_DEVICE:
try:
filename = 'Side2/IMG_'+".jpg"
cv2.imwrite(f"{savePath1}Side2/IMG_.jpg",image_converted)
shutil.move(f"{savePath1}Side2/IMG_.jpg", f"{savePath1}Side2/TMP/IMG_.jpg")
#updating cam health table
side2_once1 = True
if side2_once2 == True:
update_cam_status("LEFT_POST","YES")
camlogger.critical("LEFT_POST CAM is connected")
print("LEFT_POST CAM is connected")
side2_once2 = False
except Exception as e:
print(e)
camlogger.critical("SIDE2_DEVICE 2 is not connected")
if device_serial_number == TOP_DEVICE:
try:
filename = 'Top/IMG_'+".jpg"
cv2.imwrite(f"{savePath1}Top/IMG_.jpg",image_converted)
shutil.move(f"{savePath1}Top/IMG_.jpg", f"{savePath1}Top/TMP/IMG_.jpg")
#updating cam health table
top_once1 = True
if top_once2 == True:
update_cam_status("TOP_POST","YES")
camlogger.critical("TOP CAM is connected")
print("TOP CAM is connected")
top_once2 = False
except Exception as e:
print(e)
camlogger.critical("TOP_DEVICE 2 is not connected")
if device_serial_number == BOTTOM_DEVICE:
try:
filename = 'Bottom/IMG_'+".jpg"
cv2.imwrite(f"{savePath1}Bottom/IMG_.jpg",image_converted)
shutil.move(f"{savePath1}Bottom/IMG_.jpg", f"{savePath1}Bottom/TMP/IMG_.jpg")
#updating cam health table
bottom_once1 = True
if bottom_once2 == True:
update_cam_status("BOTTOM_POST","YES")
camlogger.critical("BOTTOM CAM is connected")
print("BOTTOM CAM is connected")
bottom_once2 = False
except Exception as e:
print(e)
camlogger.critical("BOTTOM_DEVICE is not connected")
except Exception as e:
print(e)
# camlogger.critical("Image upload eror is not e")
# Release image
image_result.Release()
time.sleep(0.05)
print(f'time for one loop : {int(time.time()*1000) - t}')
except PySpin.SpinnakerException as ex:
if side2_once1:
side2_once2 = True
update_cam_status("LEFT_POST","NO")
camlogger.critical("LEFT_POST CAM is not connected")
camlogger.critical(ex)
side2_once2 = True
if top_once1:
top_once2 = True
update_cam_status("TOP_POST","NO")
camlogger.critical("TOP CAM is not connected")
camlogger.critical(ex)
top_once2 = True
if bottom_once1:
bottom_once2 = True
update_cam_status("BOTTOM_POST","NO")
camlogger.critical("BOTTOM CAM is not connected")
camlogger.critical(ex)
bottom_once2 = True
print('Error: %s' % ex)
camlogger.critical('Error: %s' % ex)
result = False
except PySpin.SpinnakerException as ex:
#update_cam_status("LEFT_POST","NO")
camlogger.critical("LEFT_POST CAM is not connected")
print("LEFT_POST CAM is connected")
#update_cam_status("TOP_POST","NO")
camlogger.critical("TOP CAM is not connected")
print("TOP CAM is connected")
#update_cam_status("BOTTOM_POST","NO")
camlogger.critical("BOTTOM CAM is not connected")
print("BOTTOM CAM is connected")
camlogger.critical('Error: %s' % ex)
print('Error: %s' % ex)
camlogger.critical("while loop error | calling restartFrameCapturePath",ex)
# subprocess.call(["sh", restartFrameCapturePath])
result = False
for cam in cam_list:
camSerialNumber = getSerialNumber(cam)
if camSerialNumber == TOP_DEVICE or camSerialNumber == SIDE2_DEVICE or camSerialNumber == BOTTOM_DEVICE:
cam.EndAcquisition()
return result
def configure_exposure(cam, exposure_value):
print('*** CONFIGURING EXPOSURE ***\n')
try:
result = True
if cam.ExposureAuto.GetAccessMode() != PySpin.RW:
print('Unable to disable automatic exposure. Aborting...')
return False
cam.ExposureAuto.SetValue(PySpin.ExposureAuto_Off)
print('Automatic exposure disabled...')
if cam.ExposureTime.GetAccessMode() != PySpin.RW:
print('Unable to set exposure time. Aborting...')
return False
# Ensure desired exposure time does not exceed the maximum
exposure_time_to_set = exposure_value
exposure_time_to_set = min(cam.ExposureTime.GetMax(), exposure_time_to_set)
cam.ExposureTime.SetValue(exposure_time_to_set)
print('Shutter time set to %s us...\n' % exposure_time_to_set)
except PySpin.SpinnakerException as ex:
camlogger.critical("configure_exposure error | calling restartFrameCapturePath",ex)
subprocess.call(["sh", restartFrameCapturePath])
print('Error: %s' % ex)
result = False
camlogger.critical("configure_exposure error",ex)
return result
def print_device_info(nodemap, cam_num):
"""
This function prints the device information of the camera from the transport
layer; please see NodeMapInfo example for more in-depth comments on printing
device information from the nodemap.
:param nodemap: Transport layer device nodemap.
:param cam_num: Camera number.
:type nodemap: INodeMap
:type cam_num: int
:returns: True if successful, False otherwise.
:rtype: bool
"""
print('Printing device information for camera %d... \n' % cam_num)
try:
result = True
node_device_information = PySpin.CCategoryPtr(nodemap.GetNode('DeviceInformation'))
if PySpin.IsAvailable(node_device_information) and PySpin.IsReadable(node_device_information):
features = node_device_information.GetFeatures()
for feature in features:
node_feature = PySpin.CValuePtr(feature)
print('%s: %s' % (node_feature.GetName(),
node_feature.ToString() if PySpin.IsReadable(node_feature) else 'Node not readable'))
else:
print('Device control information not available.')
print()
except PySpin.SpinnakerException as ex:
print('Error: %s' % ex)
camlogger.critical("print_device_info error",ex)
return False
return result
def getSerialNumber(cam):
try:
device_serial_number = ''
nodemap_tldevice = cam.GetTLDeviceNodeMap()
node_device_serial_number = PySpin.CStringPtr(nodemap_tldevice.GetNode('DeviceSerialNumber'))
if PySpin.IsAvailable(node_device_serial_number) and PySpin.IsReadable(node_device_serial_number):
device_serial_number = node_device_serial_number.GetValue()
# print('Device serial number retrieved as %s...' % device_serial_number)
return device_serial_number
except Exception as e:
print(e)
camlogger.critical("getSerialNumber error",e)
def run_multiple_cameras(cam_list):
try:
result = True
print('*** DEVICE INFORMATION ***\n')
device_list = []
for i, cam in enumerate(cam_list):
camSerialNumber = getSerialNumber(cam)
print(camSerialNumber)
if camSerialNumber == TOP_DEVICE or camSerialNumber == SIDE2_DEVICE or camSerialNumber == BOTTOM_DEVICE:
# Retrieve TL device nodemap
nodemap_tldevice = cam.GetTLDeviceNodeMap()
#adding device number to the device list
device_list.append(getSerialNumber(cam))
# Print device information
result &= print_device_info(nodemap_tldevice, i)
for device in device_list:
if SIDE2_DEVICE in device_list:
update_cam_status("LEFT_POST","YES")
camlogger.critical("LEFT_POST CAM is connected")
print("LEFT_POST CAM is connected")
if TOP_DEVICE in device_list:
update_cam_status("TOP_POST","YES")
camlogger.critical("TOP CAM is connected")
print("TOP CAM is connected")
if BOTTOM_DEVICE in device_list:
update_cam_status("BOTTOM_POST","YES")
camlogger.critical("BOTTOM CAM is connected")
print("BOTTOM CAM is connected")
# Initialize each camera
#
# *** NOTES ***
# You may notice that the steps in this function have more loops with
# less steps per loop; this contrasts the AcquireImages() function
# which has less loops but more steps per loop. This is done for
# demonstrative purposes as both work equally well.
#
# *** LATER ***
# Each camera needs to be deinitialized once all images have been
# acquired.
for i, cam in enumerate(cam_list):
camSerialNumber = getSerialNumber(cam)
if camSerialNumber == TOP_DEVICE or camSerialNumber == SIDE2_DEVICE or camSerialNumber == BOTTOM_DEVICE:
# Initialize camera
cam.Init()
# Acquire images on all cameras
result &= acquire_images(cam_list)
# Deinitialize each camera
#
# *** NOTES ***
# Again, each camera must be deinitialized separately by first
# selecting the camera and then deinitializing it.
for cam in cam_list:
camSerialNumber = getSerialNumber(cam)
if camSerialNumber == TOP_DEVICE or camSerialNumber == SIDE2_DEVICE or camSerialNumber == BOTTOM_DEVICE:
# Deinitialize camera
cam.DeInit()
# Release reference to camera
# NOTE: Unlike the C++ examples, we cannot rely on pointer objects being automatically
# cleaned up when going out of scope.
# The usage of del is preferred to assigning the variable to None.
del cam
except PySpin.SpinnakerException as ex:
print('Error: %s' % ex)
result = False
camlogger.critical("main error",ex)
camlogger.critical("main error | calling restartFrameCapturePath",ex)
# subprocess.call(["sh", restartFrameCapturePath])
return result
def main():
try:
try:
test_file = open('test.txt', 'w+')
except IOError:
print('Unable to write to current directory. Please check permissions.')
input('Press Enter to exit...')
return False
test_file.close()
os.remove(test_file.name)
result = True
system = PySpin.System.GetInstance()
# Get current library version
version = system.GetLibraryVersion()
print('Library version: %d.%d.%d.%d' % (version.major, version.minor, version.type, version.build))
# Retrieve list of cameras from the system
cam_list = system.GetCameras()
num_cameras = cam_list.GetSize()
print('Number of cameras detected: %d' % num_cameras)
# Finish if there are no cameras
if num_cameras == 0:
update_cam_status("LEFT_POST","NO")
update_cam_status("TOP_POST","NO")
update_cam_status("BOTTOM_POST","NO")
# Clear camera list before releasing system
cam_list.Clear()
# Release system instance
system.ReleaseInstance()
print('Not enough cameras!')
input('Done! Press Enter to exit...')
return False
# Run example on all cameras
print('Running example for all cameras...')
result = run_multiple_cameras(cam_list)
print('Example complete... \n')
# Clear camera list before releasing system
for cam in cam_list:
camSerialNumber = getSerialNumber(cam)
if camSerialNumber == TOP_DEVICE or camSerialNumber == SIDE2_DEVICE or camSerialNumber == BOTTOM_DEVICE:
cam.Clear()
# Deinitialize camera
cam.DeInit()
# Release system instance
system.ReleaseInstance()
input('Done! Press Enter to exit...')
return result
except Exception as e:
print(e)
camlogger.critical("main error",e)
# subprocess.call(["sh", restartFrameCapturePath])
if __name__ == '__main__':
if main():
sys.exit(0)
else:
sys.exit(1)
Following is my 3D Camera code
#for single instance
from tendo import singleton
me = singleton.SingleInstance()
import logging,time,os,traceback
from typing import Optional
import sys
import cv2
import xml.etree.ElementTree as ET
import subprocess
import datetime
import numpy as np
import shutil
from cx.device import Device
from cx.device_factory import DeviceFactory
import cx.cx_cam as cam
import cx.cx_base as base
device: Optional[Device] = None
topPath = "/home/pwil-server/Insightzz/code/Algorithm/3DCamera/frameCapture/top/"
topTMPPath = "/home/pwil-server/Insightzz/code/Algorithm/3DCamera/frameCapture/top/TMP/"
import pymysql
db_user = "root"
db_host = "localhost"
db_pass = "insightzz@123"
db_name = "PWIL_SURFACE_DEFECT_DB"
logging.basicConfig(filename="frameCapture3D_Top.log",filemode='a',format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
mv_logger=logging.getLogger("frameCapture3D_Top")
mv_logger.setLevel(logging.DEBUG)
mv_logger.debug("CODE STARTED")
processID = os.getpid()
def updateProcessId(processId):
try:
print("This process has the PID", processId)
mv_logger.critical(f'This process has the PID {processId}')
db_update = pymysql.connect(host=db_host, # your host, usually localhost
user=db_user, # your username
passwd=db_pass, # your password
db=db_name)
cur = db_update.cursor()
query = f"UPDATE PROCESS_ID_TABLE set PROCESS_ID = {str(processId)} \
where PROCESS_NAME = 'start3DFrameCapture_Top'"
cur.execute(query)
db_update.commit()
cur.close()
except Exception as e:
print('Exception in updateProcessId: '+str(e))
mv_logger.critical('Exception in updateProcessId: '+str(e))
def grabImage(device):
try:
# 4.1 Grab image buffer, wait for image with optional timeout
dev_buffer = device.waitForBuffer(10000)
if dev_buffer is None:
logging.error("Failed to get buffer")
exit(0)
# 4.2 get image from buffer
# the img object holds a reference to the data in the internal buffer, if you need the image data after cx_queueBuffer you need to copy it!
img = dev_buffer.getImage()
if img is None:
logging.error("Failed get image from buffer")
exit(0)
# copy img.data
img_data = np.copy(img.data)
# 4.3 Queue back the buffer. From now on the img.data is not valid anymore and might be overwritten with new image data at any time!
if dev_buffer.queueBuffer() is False:
logging.error("Failed to queue back buffer")
exit(0)
return img_data
except Exception as e:
print('Exception in grabImage: '+str(e))
mv_logger.critical('Exception in grabImage: '+str(e))
def checkCameraConnection(deviceParam):
try:
print(f"Successfully connected to camera device {deviceParam.DeviceSerialNumber} {deviceParam.DeviceModel} - {deviceParam.DeviceIP}")
except Exception as e:
print(f'Error in checkCameraConnection {deviceParam.DeviceSerialNumber}: {e}')
def setCameraParam(deviceParam,device):
try:
mode=device.getParam("DeviceScanType")
device.setParam("DeviceScanType","Linescan3D")
device.setParam("ExposureTime",20.0)
device.setParam("ComponentSelector","Range")#"Range"
if device.allocAndQueueBuffer(64) is False:
# Further error messages are logged by the Device class
exit(0)
# 3. Start acquisition.
if device.startAcquisition() is False:
exit(0)
except Exception as e:
print(f'Error in checkCameraConnection {deviceParam.DeviceSerialNumber}: {e}')
def main():
try:
updateProcessId(processID)
devices = DeviceFactory.findDevices()
if len(devices) < 1:
print("No cameras found")
exit(0)
print(len(devices))
# Connecting first device via factory returns a Device object
for i,item in enumerate(devices):
if item.DeviceSerialNumber == '21910315':
deviceParamTop = devices[i]
deviceTop = DeviceFactory.openDevice(item.DeviceUri)
checkCameraConnection(deviceParamTop)
setCameraParam(deviceParamTop,deviceTop)
imageToStichTop = []
START = 0
if not os.path.exists(topTMPPath):
os.makedirs(topTMPPath)
print('done')
while True:
try:
t1 = int(time.time())*1000
img_data_top = grabImage(deviceTop)
print(f'time for one grab : {int(time.time())*1000 - t1} ms')
imageToStichTop.append(img_data_top)
if len(imageToStichTop) == 15:
Stiched_image_top = cv2.vconcat(imageToStichTop)
fname=f'tmp.tif'
image_path_top = os.path.join(topPath, fname)
image_path_tmp_top = os.path.join(topTMPPath, fname)
cv2.imwrite(image_path_top,Stiched_image_top)
shutil.copy(image_path_top,image_path_tmp_top)
imageToStichTop = []
except Exception as e:
print('Exception in main while loop: '+str(e))
mv_logger.critical('Exception in main while loop: '+str(e))
except Exception as e:
print('Exception in main : '+str(e))
mv_logger.critical('Exception in main : '+str(e))
if __name__ == '__main__':
main()
- I have tried running the machine vision code seperately and checked for errors. but the code runs smoothly but when 3d code run simultaneously then machine vison code stops.
- Tried upgrading and updating softwares
Expectations : The machine vision code and 3d camera code to run parallerly.