Time-series data analysis using scientific python: continuous analysis over multiple files

1.8k views Asked by At

The Problem

I'm doing time-series analysis. Measured data comes from the sampling the voltage output of a sensor at 50 kHz and then dumping that data to disk as separate files in hour chunks. Data is saved to an HDF5 file using pytables as a CArray. This format was chosen to maintain interoperability with MATLAB.

The full data set is now multiple TB, far too large to load into memory.

Some of my analysis requires me to iterative over the full data set. For analysis that requires me to grab chunks of data, I can see a path forward through creating a generator method. I'm a bit uncertain of how to proceed with analysis that requires a continuous time series.

Example

For example, let's say I'm looking to find and categorize transients using some moving window process (e.g. wavelet analysis) or apply a FIR filter. How do I handle the boundaries, either at the end or beginning of a file or at chunk boundaries? I would like the data to appear as one continuous data set.

Request

I would love to:

  • Keep the memory footprint low by loading data as necessary.
  • Keep a map of the entire data set in memory so that I can address the data set as I would a regular pandas Series object, e.g. data[time1:time2].

I'm using scientific python (Enthought distribution) with all the regular stuff: numpy, scipy, pandas, matplotlib, etc. I only recently started incorporating pandas into my work flow and I'm still unfamiliar with all of its capabilities.

I've looked over related stackexchange threads and didn't see anything that exactly addressed my issue.

EDIT: FINAL SOLUTION.

Based upon the helpful hints I built a iterator that steps over files and returns chunks of arbitrary size---a moving window that hopefully handles file boundaries with grace. I've added the option of padding the front and back of each of the windows with data (overlapping windows). I can then apply a succession of filters to the overlapping windows and then remove the overlaps at the end. This, I hope, gives me continuity.

I haven't yet implemented __getitem__ but it's on my list of things to do.

Here's the final code. A few details are omitted for brevity.

class FolderContainer(readdata.DataContainer):

    def __init__(self,startdir):
        readdata.DataContainer.__init__(self,startdir)

        self.filelist = None
        self.fs = None
        self.nsamples_hour = None
        # Build the file list
        self._build_filelist(startdir)


    def _build_filelist(self,startdir):
        """
        Populate the filelist dictionary with active files and their associated
        file date (YYYY,MM,DD) and hour.

        Each entry in 'filelist' has the form (abs. path : datetime) where the
        datetime object contains the complete date and hour information.
        """
        print('Building file list....',end='')
        # Use the full file path instead of a relative path so that we don't
        # run into problems if we change the current working directory.
        filelist = { os.path.abspath(f):self._datetime_from_fname(f)
                for f in os.listdir(startdir)
                if fnmatch.fnmatch(f,'NODE*.h5')}

        # If we haven't found any files, raise an error
        if not filelist:
            msg = "Input directory does not contain Illionix h5 files."
            raise IOError(msg)
        # Filelist is a ordered dictionary. Sort before saving.
        self.filelist = OrderedDict(sorted(filelist.items(),
                key=lambda t: t[0]))
        print('done')
    
    def _datetime_from_fname(self,fname):
        """
        Return the year, month, day, and hour from a filename as a datetime
        object
        
        """
        # Filename has the prototype: NODE##-YY-MM-DD-HH.h5. Split this up and
        # take only the date parts. Convert the year form YY to YYYY.
        (year,month,day,hour) = [int(d) for d in re.split('-|\.',fname)[1:-1]]
        year+=2000
        return datetime.datetime(year,month,day,hour)


    def chunk(self,tstart,dt,**kwargs):
        """
        Generator expression from returning consecutive chunks of data with
        overlaps from the entire set of Illionix data files.

        Parameters
        ----------
        Arguments:
            tstart: UTC start time [provided as a datetime or date string]
            dt: Chunk size [integer number of samples]

        Keyword arguments:
            tend: UTC end time [provided as a datetime or date string].
            frontpad: Padding in front of sample [integer number of samples].
            backpad: Padding in back of sample [integer number of samples]

        Yields:
            chunk: generator expression

        """
        # PARSE INPUT ARGUMENTS

        # Ensure 'tstart' is a datetime object.
        tstart = self._to_datetime(tstart)
        # Find the offset, in samples, of the starting position of the window
        # in the first data file
        tstart_samples = self._to_samples(tstart)

        # Convert dt to samples. Because dt is a timedelta object, we can't use
        # '_to_samples' for conversion.
        if isinstance(dt,int):
            dt_samples = dt
        elif isinstance(dt,datetime.timedelta):
            dt_samples = np.int64((dt.day*24*3600 + dt.seconds + 
                    dt.microseconds*1000) * self.fs)
        else:
            # FIXME: Pandas 0.13 includes a 'to_timedelta' function. Change
            # below when EPD pushes the update.
            t = self._parse_date_str(dt)
            dt_samples = np.int64((t.minute*60 + t.second) * self.fs)

        # Read keyword arguments. 'tend' defaults to the end of the last file
        # if a time is not provided.
        default_tend = self.filelist.values()[-1] + datetime.timedelta(hours=1)
        tend = self._to_datetime(kwargs.get('tend',default_tend))
        tend_samples = self._to_samples(tend)

        frontpad = kwargs.get('frontpad',0)
        backpad = kwargs.get('backpad',0)


        # CREATE FILE LIST

        # Build the the list of data files we will iterative over based upon
        # the start and stop times.
        print('Pruning file list...',end='')
        tstart_floor = datetime.datetime(tstart.year,tstart.month,tstart.day,
                tstart.hour)
        filelist_pruned = OrderedDict([(k,v) for k,v in self.filelist.items()
                if v >= tstart_floor and v <= tend])
        print('done.')
        # Check to ensure that we're not missing files by enforcing that there
        # is exactly an hour offset between all files.
        if not all([dt == datetime.timedelta(hours=1) 
                for dt in np.diff(np.array(filelist_pruned.values()))]):
            raise readdata.DataIntegrityError("Hour gap(s) detected in data")


        # MOVING WINDOW GENERATOR ALGORITHM

        # Keep two files open, the current file and the next in line (que file)
        fname_generator = self._file_iterator(filelist_pruned)
        fname_current = fname_generator.next()
        fname_next = fname_generator.next()

        # Iterate over all the files. 'lastfile' indicates when we're
        # processing the last file in the que.
        lastfile = False
        i = tstart_samples
        while True:
            with tables.openFile(fname_current) as fcurrent, \
                    tables.openFile(fname_next) as fnext:
                # Point to the data
                data_current = fcurrent.getNode('/data/voltage/raw')
                data_next = fnext.getNode('/data/voltage/raw')
                # Process all data windows associated with the current pair of
                # files. Avoid unnecessary file access operations as we moving
                # the sliding window.
                while True:
                    # Conditionals that depend on if our slice is:
                    #   (1) completely into the next hour
                    #   (2) partially spills into the next hour
                    #   (3) completely in the current hour.
                    if i - backpad >= self.nsamples_hour:
                        # If we're already on our last file in the processing
                        # que, we can't continue to the next. Exit. Generator
                        # is finished.
                        if lastfile:
                            raise GeneratorExit
                        # Advance the active and que file names. 
                        fname_current = fname_next
                        try:
                            fname_next = fname_generator.next()
                        except GeneratorExit:
                            # We've reached the end of our file processing que.
                            # Indicate this is the last file so that if we try
                            # to pull data across the next file boundary, we'll
                            # exit.
                            lastfile = True
                        # Our data slice has completely moved into the next
                        # hour.
                        i-=self.nsamples_hour
                        # Return the data
                        yield data_next[i-backpad:i+dt_samples+frontpad]
                        # Move window by amount dt
                        i+=dt_samples
                        # We've completely moved on the the next pair of files.
                        # Move to the outer scope to grab the next set of
                        # files.
                        break  
                    elif i + dt_samples + frontpad >= self.nsamples_hour:
                        if lastfile:
                            raise GeneratorExit
                        # Slice spills over into the next hour
                        yield np.r_[data_current[i-backpad:],
                                data_next[:i+dt_samples+frontpad-self.nsamples_hour]]
                        i+=dt_samples
                    else:
                        if lastfile:
                            # Exit once our slice crosses the boundary of the
                            # last file.
                            if i + dt_samples + frontpad > tend_samples:
                                raise GeneratorExit
                        # Slice is completely within the current hour
                        yield data_current[i-backpad:i+dt_samples+frontpad]
                        i+=dt_samples


    def _to_samples(self,input_time):
        """Convert input time, if not in samples, to samples"""
        if isinstance(input_time,int):
            # Input time is already in samples
            return input_time
        elif isinstance(input_time,datetime.datetime):
            # Input time is a datetime object
            return self.fs * (input_time.minute * 60 + input_time.second)
        else:
            raise ValueError("Invalid input 'tstart' parameter")


    def _to_datetime(self,input_time):
        """Return the passed time as a datetime object"""
        if isinstance(input_time,datetime.datetime):
            converted_time = input_time
        elif isinstance(input_time,str):
            converted_time = self._parse_date_str(input_time)
        else:
            raise TypeError("A datetime object or string date/time were "
                    "expected")
        return converted_time


    def _file_iterator(self,filelist):
        """Generator for iterating over file names."""
        for fname in filelist:
            yield fname
1

There are 1 answers

2
Jeff On BEST ANSWER

@Sean here's my 2c

Take a look at this issue here which I created a while back. This is essentially what you are trying to do. This is a bit non-trivial.

Without knowing more details, I would offer a couple of suggestions:

  • HDFStore CAN read in a standard CArray type of format, see here

  • You can easily create a 'Series' like object that has nice properties of a) knowing where each file is and its extents, and uses __getitem__ to 'select' those files, e.g. s[time1:time2]. From a top-level view this might be a very nice abstraction, and you can then dispatch operations.

e.g.

class OutOfCoreSeries(object):

     def __init__(self, dir):
            .... load a list of the files in the dir where you have them ...

     def __getitem__(self, key):
            .... map the selection key (say its a slice, which 'time1:time2' resolves) ...
            .... to the files that make it up .... , then return a new Series that only
            .... those file pointers ....

     def apply(self, func, **kwargs):
            """ apply a function to the files """
            results = []
            for f in self.files:
                     results.append(func(self.read_file(f)))
            return Results(results)

This can very easily get quite complicated. For instance, if you apply an operation that does a reduction that you can fit in memory, Results can simpley be a pandas.Series (or Frame). Hoever, you may be doing a transformation which necessitates you writing out a new set of transformed data files. If you so, then you have to handle this.

Several more suggestions:

  • You may want to hold onto your data in possibly multiple ways that may be useful. For instance you say that you are saving multiple values in a 1-hour slice. It may be that you can split these 1-hour files instead into a file for each variable you are saving but save a much longer slice that then becomes memory readable.

  • You might want to resample the data to lower frequencies, and work on these, loading the data in a particular slice as needed for more detailed work.

  • You might want to create a dataset that is queryable across time, e.g. say high-low peaks at varying frequencies, e.g. maybe using the Table format see here

Thus you may have multiple variations of the same data. Disk space is usually much cheaper/easier to manage than main memory. It makes a lot of sense to take advantage of that.