Fitting Variable Number of Lorentzian Peaks to a glob of data in text files

65 views Asked by At

So I've gotten the code to work, but it's extremely slow and doesn't give the proper full width at half maxima. Is there something I can do to get the FWHM values and speed up the processing without losing post-processing quality?

    '''python'''
    # -*- coding: utf-8 -*-
    """
    Created on Fri Feb 16 11:38:14 2024
    
    @author: tppoirier96
    """
    
    # importing packages 
    import pandas as pd 
    import glob
    import numpy as np
    from scipy.optimize import leastsq
    import matplotlib.pyplot as plt
    
    folder_path = 'folder_path'
    # Locating folder with spectral data
    file_list = glob.glob(folder_path + "/*.txt")
    # Taking all text files from folder
    main_dataframe = pd.DataFrame(pd.read_table(file_list[0]))
    # Compling dataframe for each spectra
    
    #SpectraType = str(input('What kind of spectral data is this?'))
    
    for i in range(0,len(file_list)-1):
        # for all files in the glob
        data = pd.read_table(file_list[i], sep="\t")
        # Read each text file as dataframe
        df = pd.DataFrame(data)
        # Convert each file to a dataframe
        main_dataframe = pd.concat([main_dataframe, df], axis = 1)
        # Append each dataframe to larger dataframe
    
    DataDF = 
   pd.concat([main_dataframe.iloc[67:,0],main_dataframe.iloc[67:,1::2]], axis=1)
    # Cutting off first 110 wavenumbers due to substantial noise
    # Also removing the wavenumbers for all but the first spectra
    # Compling each modified spectra
    SumDataDF = DataDF.describe()
    # Summary for tracking important numbers
    DataNorm = pd.DataFrame()
    # Initializing new dataframe
    DataLorentz = pd.DataFrame()
    # Initializing new dataframe
    
    
    DataNorm = (DataDF-DataDF.min())/(DataDF.max()-DataDF.min())
    # Normalizing data
    DataNorm = pd.concat([DataDF.iloc[:,0], DataNorm], axis=1)
    # Adding wavenumbers back for visualization
    
    xData = DataNorm.iloc[:,0].array
    # Initializing wavenumber array
    E2GArray = np.zeros((4,len(file_list)))
    # Initializing E2G location array
    
    # Standard Lorentzian function
    def norm_lorentzian(x, x0, a, gamma):
        return a * gamma**2 / ( gamma**2 + ( x - x0 )**2)
    
    # For multiple peaks
    def multi_lorentz(x,params):
        off = params[0]
        paramsRest = params[1:]
        assert not (len(paramsRest )%3)
        return off + sum([norm_lorentzian( x, *paramsRest[ i : i+3 ] ) 
    for i in range( 0, len( paramsRest ), 3 ) ] )
    
    # Least squares regression method
    def res_multi_lorentz( params, xData, yData ):
        diff = [ multi_lorentz( x, params ) - y for x, y in zip( xData, 
    yData ) ]
        return diff
    
    # Begin processing
    for i in range(1,len(DataNorm.columns)-1):
        yData = DataNorm.iloc[:,i].array
        # Converting Spectra intensities to convenient type
        #NumPeaks = int(input('How many peaks to fit?\n'))
        # Collecting number of peaks to fit
        counter = 0
        # Break condition
        generalWidth = 10
        # Starting HWHM
        yDataLoc = yData
        # Duplicating intensities for convenience
        startValues = [max(DataNorm.iloc[:,i])]
        # Taking the maximum (should be 1.0)
        while max( yDataLoc ) - min( yDataLoc ) > .1:
        # I'm not sure about this condition
            counter += 1
            # Break conditon increment
            print(str(counter)+ " while")
            if counter > 20:
                print("Reached max iterations!")
                break
            minP = np.argmin( yDataLoc )
            # Index of minimum intensity (what wavenumber the baseline 
    is)
            minY = yData[ minP ]
            # What the minimum intensity is
            x0 = xData[ minP ]
            # Seeting the initial E2G position as the minimum
            startValues += [ x0, minY - max( yDataLoc ), generalWidth ]
            # Appending values to feed to lorentz function
            popt, ier = leastsq( res_multi_lorentz, startValues, args=( 
    xData, yData))
            # Returns result of optimization and integer flag for 
    solution
            # popt is a concatonation of final values similar to 
    startingValues
            # The final popts values are fed to the line below for each 
    spectral intensity set
            yDataLoc = [y - multi_lorentz( x, popt ) for x,y in zip( 
    xData, yData)]
            # Appends difference betweeen y-values and lorentzian curves
            if 1300<x0<1400:
            # If the E2G is found, append it to arrary for data 
    compilation
                E2GArray[0,i-1] = i
                # Tracking which data file it came from
                E2GArray[2,i] = x0
                # Storing location of E2G
                E2GArray[3,i] = generalWidth*2
                # Storing FWHM for later
        print(str(i)+" for")
        testData = [multi_lorentz(x,popt) for x in xData]
        # Makes array of lorentzian fits
        DataLorentz = pd.concat([DataLorentz,pd.Series(testData)] , 
    axis=1)
    
        fig = plt.figure()
        ax = fig.add_subplot( 1, 1, 1 )
        ax.plot( xData, yData, label="Data" )
        ax.plot( xData, testData, label="Lorentz")
        plt.xlabel("Wavenumber")
        plt.ylabel("Intensity relative to E2G")
        plt.legend(loc="upper left")
        plt.show()
    
    # creating a new csv file with the dataframe we created
    # cwd = os.getcwd()
    # print(cwd)
    #main_dataframe.to_excel('Mapped Data.xlsx')

I would like to be able to process approximately 4000 files hastily. Currently it is projected to take 1 week to process this data. I would like to be able to store the position of the main peak (expected between 1300 and 1400 in the inner-most if statement) as well the FWHM, hence the 4th row of the E2GArray.

1

There are 1 answers

4
jlandercy On BEST ANSWER

Methodology

There are few challenges to take into account:

  • Baseline management (remove background to flatten baseline);
  • Peak identification (discover where peaks are located);
  • Peak regression (regress w/ variable peaks model and educated guess).

Then, automatize for each file.

MCVE

The following class implement the methodology:

import inspect
import itertools
import pathlib

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pybaselines import Baseline
from scipy import optimize, stats, signal
    
class SpectroscopySolver:

    @staticmethod
    def peak_model(x, loc, scale, height):
        """Lorentzian peak"""
        law = stats.cauchy(loc=loc, scale=scale)
        return height * law.pdf(x) / law.pdf(loc)

    @classmethod
    def m(cls):
        """Number of model parameters"""
        return len(inspect.signature(cls.peak_model).parameters) - 1

    @classmethod
    def model(cls, x, *p):
        """Variable peak model"""
        m = cls.m()
        n = len(p) // m
        y = np.zeros_like(x)
        for i in range(n):
            y += cls.peak_model(x, *p[i * m:(i + 1) * m])
        return y

    @classmethod
    def loss_factory(cls, xdata, ydata):
        """Least Square Factory"""
        def wrapped(p):
            """Least Square Loss function"""
            return 0.5 * np.sum(np.power(ydata - cls.model(xdata, *p), 2))
        return wrapped

    @classmethod
    def fit(cls, xdata, ydata, prominence=400.):
        """Fitting methodology"""

        # Baseline management:
        baseline_fitter = Baseline(x_data=xdata)
        baseline, params = baseline_fitter.asls(ydata)
        yb = ydata - baseline

        # Peak identifications:
        peaks, bases = signal.find_peaks(yb, prominence=prominence)
        lefts, rights = bases["left_bases"], bases["right_bases"]
        
        # Initial guess tuning:
        x0 = list(itertools.chain(*[[xdata[i], 0.1, p] for i, p in zip(peaks, bases["prominences"])]))
        bounds = list(itertools.chain(*[
            [(xdata[lefts[i]], xdata[rights[i]])] + [(0., np.inf)] * (cls.m() - 1) for i in range(len(peaks))
        ]))

        # Least Square Loss Minimization:
        loss = cls.loss_factory(xdata, yb)
        solution = optimize.minimize(loss, x0=x0, bounds=bounds)

        # Estimate:
        yhat = cls.model(xdata, *solution.x)
        ybhat = yhat + baseline
        
        # Plot:
        fig, axe = plt.subplots()
        axe.scatter(xdata, ydata, marker=".", color="gray", label="Data")
        axe.scatter(xdata[peaks], ydata[peaks], label="Peaks")
        axe.plot(xdata, baseline, "--", label="Baseline")
        axe.plot(xdata, ybhat, label="Fit")
        axe.set_title("Spectrogram")
        axe.set_xlabel(r"Wavenumber, $\nu$")
        axe.set_ylabel(r"Raw Signal, $g(\nu)$")
        axe.legend()
        axe.grid()
        
        return axe

Then it suffices to automatize for each file:

files = list(pathlib.Path("raman/").glob("*.txt"))
solver = SpectroscopySolver()

for file in files:
    
    data = pd.read_csv(file, sep="\t", header=None, skiprows=100, names=["x", "y"])
    solver.fit(data.x, data.y)

Which performs relatively well with regards to your datasets.

enter image description here enter image description here enter image description here

Tuning

You can control the methodology by:

  • Choosing the appropriate baseline fitter (see pybaseline);
  • Adapting educated initial guess to better drive the gradient descent;
  • Refine bounds to enforce better convergence;
  • Filtering peaks by prominence;
  • Add a minus sign to the peak model if you are about fitting downward peaks;