I have a NXP board equipped with a NPU where a python script for image processing is executed. The script acquires images and process them using Tensorflow Lite and NNAPI delegate. In particular I have tensorflow-imx installed with YOCTO.
The TFLite interpreter is initialized and invoked in the main process. I noticed that when I start a thread that uses subprocess to perform some operations on the filesystem, this breaks the tflite interpreter which starts providing always the same output. The following script shows the problem by randomly loading images and processing them. A thread is periodically started and this causes the problem. What could be the reason for this?
import numpy as np
from threading import Thread
from random import random, choice, randint
import tflite_runtime.interpreter as tflite
import time
import cv2
import os
import sys
import psutil
import subprocess
class ClassificationModel(object):
def __init__(self, path):
self.model_path = path
self.interpreter = tflite.Interpreter(model_path=path)
self.interpreter.allocate_tensors()
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()
self.input_shape = self.input_details[0]['shape']
def predict(self, img, resize=True):
if resize:
img = cv2.resize(img, (self.input_shape[2], self.input_shape[1]))
img = (img/255.0).astype(np.float32)
img = np.expand_dims(img, 0)
t1 = time.time()
self.interpreter.set_tensor(self.input_details[0]['index'], img)
self.interpreter.invoke()
output = self.interpreter.get_tensor(self.output_details[0]['index'])
output = np.squeeze(output)
return output
def reload(self):
del(self.interpreter)
self.interpreter = tflite.Interpreter(model_path=self.model_path)
self.interpreter.allocate_tensors()
def my_thread_1():
print("Start threaded task 1")
# copy one large file
for i in range(5):
subprocess.Popen(["cp", "f1/file.txt", "f2"])
print("Task 1 completed")
if __name__ == '__main__':
classifier_1 = ClassificationModel('model.tflite')
file_list = os.listdir('samples')
prev_pred = 0
n_equal = 0
try:
while True:
# get image
img1 = cv2.imread(os.path.join('samples', choice(file_list)))
# predict
p1 = classifier_1.predict(img1)[0]
print(p1)
if p1 == prev_pred:
n_equal += 1
else:
n_equal = 0
prev_pred = p1
if n_equal == 10:
print("--------tflite broken - reload")
classifier_1.reload()
n_equal = 0
# threaded task
if random() < 0.05:
t = Thread(target=my_thread_1)
t.start()
except(KeyboardInterrupt):
pass