Data Logging Script Issue - partial data saved in csv

68 views Asked by At

I have developed a Python script for data logging using Modbus communication. The script's primary purpose is to collect data from a Modbus-connected device, such as RPM, torque, temperature, and pressure, at a high frequency, approximately every 6-9 milliseconds. This data is then saved to a CSV file for later analysis.

I save it temporarily in a pandas data frame to reduce read delay then write it to the csv at the end

The script appears to work well initially; however, when I run it for an extended period, such as one hour, it falls short of my expectations. Instead of continuously logging data at the intended rate, the data collection appears to stop after the initial 10 minutes or so, resulting in only around 117,000 rows in the CSV file. This is not consistent with the expected data collection rate, which should result in a significantly larger dataset over an hour.

I have provided the Python script responsible for data logging. It utilizes the pymodbus library for Modbus communication and the pandas library for data storage. The script connects to the Modbus device, retrieves data at the desired rate, and logs it to a CSV file.

class Data_Logger:
    TIME_FORMAT = "%d-%m-%Y %H:%M:%S.%f"

    def __init__(self, client, ip):
        self.IP = ip
        self.REG_start = 380
        self.RPM = []
        self.timestamps = []
        self.client = client
        self.start_time = datetime.datetime.now()
        self.headers = ['Timestamp', 'RPM',
                       'TORQUE', 'PS',
                       'DY RPM', 'PLUG TEMP',
                       'OIL TEMP', 'PRESSURE ATM',
                       'ENGINE TORQUE', 'FUEL CONSUMPTION',
                       'WET TEMP', 'DRY TEMP',
                       'INTAKE TEMP', 'OIL PRESSURE']
        self.data = pd.DataFrame(columns=self.headers)

    def read_reg(self):
        response = self.client.read_holding_registers(self.REG_start, 49)
        self.RPM = response.registers

    def timestamp_update(self):
        time = tt.timeit(self.read_reg, number=1) * 1000.0
        self.start_time += datetime.timedelta(milliseconds=float(time))
        row_data = [self.start_time.strftime(self.TIME_FORMAT)] + [self.RPM[i] for i in range(0,49,4)]
        new_data = pd.DataFrame([row_data], columns=self.headers)
        self.data = pd.concat([self.data, new_data], ignore_index=True)

    def clear_data(self):
        self.RPM = []
        self.timestamps = []
        self.data = pd.DataFrame(columns=self.headers)  # Reinitialize the DataFrame

    def check_connection_quality(self):
        try:
            response = self.client.read_holding_registers(self.REG_start, 1)  # Read a known register
            if response:
                return True  # Successful read, connection is active
        except Exception as e:
            print(f"Connection Quality Check Error\n{e}\nPlease Restart the APP")
        return False  # Unable to read the register, connection issue

    def save(self):
        date = datetime.datetime.now().strftime("%d-%m-%Y")
        file_name = f"{date} Engine Test Bench Data.csv"

        if os.path.exists(file_name):
            file_number = 1
            while os.path.exists(f"{file_name} ({file_number}).csv"):
                file_number += 1
            self.data.to_csv(f"{file_name} ({file_number}).csv")
        else:
            self.data.to_csv(file_name)


    def plot(self):
        plt.plot(self.data['RPM'])
        # Hide x-axis labels
        plt.xticks([])
        # Set labels and title
        plt.xlabel('Timestamp (ms)')
        plt.ylabel('RPM')
        plt.title('RPM vs. Timestamp')

        # Show the plot
        plt.show()

    def run(self):
        self.timestamp_update()
class App:
    def __init__(self):
        #gui objects
        self.interrupt = False
    def quality_check(self):
        try:
            for i in range(5):
                self.client.read(380)
            return True
        except AttributeError as e:
            messagebox.showerror("Read Error","Cannot Read Device Register please restart the logger")
            return False
    def connect(self):
        ip = self.ip_entry.get()
        try:
            # Validate the entered IP address
            ipaddress.IPv4Address(ip)
            self.client = ModbusTcpClient(ip, debug=True)
            self.logger = Data_Logger(client=self.client, ip=ip)
            self.quality_check()
            if self.client.connect():
                self.log("PLC Connected")
                self.status.config(text="PLC Connected", foreground='#00FB28')
                self.start.configure(state="normal")
                self.connect_button.configure(state="disabled")
                self.ip_entry.delete(0, 'end')
                time.sleep(2)
            else:
                self.log("PLC Connection Failed")

        except (ipaddress.AddressValueError, ValueError):
            messagebox.showerror("Invalid IP Address", "Please enter a valid IPv4 address.")
        except Exception as e:
            messagebox.showerror("Connection Error", str(e))


    def start_log(self):
        self.console.delete(0,'end')
        self.p_bar.start()
        self.log("Logging Started")
        self.start.configure(state="disabled")
        self.stop.configure(state="normal")
        try:
            while not self.interrupt:
                self.logger.run()
        except AttributeError as e:
            messagebox.showerror("Modbus I/O exception",f"\nRestart the Logger\n{e}")

    def thread_helper(self):
        thread = Thread(target=self.start_log)
        thread.start()

    def stop_log(self):
        self.interrupt = True
        self.p_bar.stop()
        self.log("Logging Stopped")
        self.log("Saving Data")
        self.p_bar.start()
        save_thread=Thread(target=self.logger.save)
        save_thread.start()
        self.log("Data Saved")
        self.p_bar.stop()
        self.start.configure(state="normal")
        self.stop.configure(state="disabled")
        self.logger.plot()
        self.logger.clear_data()

    def button_init(self):
        self.connect_button = tk.ttk.Button(self.connect_frame, command=self.connect, text="Connect", width=20)
        self.start = tk.ttk.Button(self.control_frame, command=self.thread_helper, text="Start", state="disabled", width=22)
        self.stop = tk.ttk.Button(self.control_frame, command=self.stop_log, text="Stop", state="disabled", width=23)

    def run(self):
        self.main.mainloop()

Data and Environment:

Operating System: Windows 10 Python Version: Python 3.9.16 Relevant Libraries: pymodbus, matplotlib, pandas Data Source: Modbus communication with a PLC (Programmable Logic Controller)

  • I've tested the script on multiple machines to rule out hardware-related issues. The problem persists across different hardware setups, indicating that the issue is likely not specific to a particular machine or system configuration.

Expectations: When running the Python data logging script, my expectation is to consistently log data at a high frequency, approximately every 6-9 milliseconds. Over the course of an hour, I anticipate a substantial dataset in the CSV file, with hundreds of thousands of rows.

Actual Results: The actual results are not in line with my expectations. While the script initially logs data as anticipated, it inexplicably stops collecting data after the first 10 minutes. This leads to a much smaller dataset than expected, with only around 117,000 rows in the CSV file after an hour.

Additional -in a test of 2-3 hrs long the the csv file was empty having only headers. -No errors or exception seen

1

There are 1 answers

3
Zach Young On

I save it temporarily in a pandas data frame to reduce read delay then write it to the csv at the end.

I don't believe adding a process will reduce latency, and Pandas is just plain slow for iterating rows/records of data.

Python's standard csv module is plenty fast for encoding CSV data. After that, the limitation of your main objective will be the native buffering behavior of Python's file writer. You can mostly get around this by calling flush() on the underlying file object for every row you need to write. I say "mostly" because writing to disk on every call of flush is not guaranteed—see the top comment on this answer.

In isolation that all looks something like:

with open("output.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["ID", "Measured value", "Time monotonic"])
    f.flush()
    while True:
        writer.writerow(get_measured_values())
        f.flush()

Here's a complete program which mocks up a producer sending some valules into a queue, roughly every 7 milliseconds, and a consumer pulling those values off the queue as soon as they are ready and writing them to a CSV. Running tail -f output.csv I can see the CSV file being updated row-by-row, multiple rows per second.

import asyncio
import csv
import random
import time

MyQueue = asyncio.Queue[tuple[int, float]]


async def rnd_sleep(t: float):
    """Sleep for t seconds, on average."""
    await asyncio.sleep(t * random.random() * 2)


ITERATIONS = 100_000


async def producer(queue: MyQueue):
    for i in range(1, ITERATIONS):
        token = random.random()
        await queue.put((i, token))
        await rnd_sleep(0.007)
        if i % 1_000 == 0:
            print(f"wrote row {i:>6}")


async def consumer(queue: MyQueue):
    with open("output.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)

        writer.writerow(["ID", "Measured value", "Time monotonic"])
        f.flush()
        while True:
            i, token = await queue.get()
            writer.writerow([i, token, time.monotonic()])
            f.flush()
            queue.task_done()


async def main():
    queue: MyQueue = asyncio.Queue()
    producers = [asyncio.create_task(producer(queue))]
    asyncio.create_task(consumer(queue))
    await asyncio.gather(*producers)


try:
    asyncio.run(main())
except KeyboardInterrupt:
    pass
except:
    raise

(My first time writing async code in Python, I cobbled this from Using asyncio.Queue for producer-consumer flow.)