Applying function on pandas dataframe of millions rows

59 views Asked by At

I have a Pandas data frame which can contain millions of rows and it has 31 columns. Now I have to apply a function on this rows. When I am trying to do that I am getting memory error. Since I believe there is some inefficiency in it. If I try to save that data frame with million records without applying that method. It can do that in less time but if I try to apply that function on each line I get memory error. I am new to Pandas and I have to refactor my current code. So I can change or modify the _update_trx method as well to suit my goal. I have to update my data frame with group_id, year, month, revision so that I can bulk insert the data frame in db. below is the code which I am using

def parse(self):
   data = pd.read_csv(file_name)
   file_data_df = pd.DataFrame(data)
   file_data_df[["revision", "group_id"]] = file_data_df.apply(self._update_trx)
    def _update_trx(self, line):
        pd = line.get("period_date")
        if not isinstance(pd, date):
            logging.error("Period date is not an instance of datetime.date.")
            return None

        grp = (self.client_id, line.get("source"), pd.year, pd.month)
        group = Tgroup.query.filter_by(
            client_id=grp[0], source=grp[1], year=grp[2], month=grp[3]
        ).first()
        if self._transaction_groups.get(grp) is None:
            res = (
                Trax.query.with_entities(func.max(Trax.revision))
                .filter_by(client_id=grp[0], source=grp[1], year=grp[2], month=grp[3])
                .group_by(
                    Trax.client_id,
                    Trax.source,
                    Trax.year,
                    Trax.month,
                )
                .first()
            )
            self._transaction_groups[grp] = res[0] + 1 if res else 1

        if group:
            line["group_id"] = group.id
        else:
            trx_group = Tgroup()
            trx_group.client_id = grp[0]
            trx_group.source = grp[1]
            trx_group.year = grp[2]
            trx_group.month = grp[3]
            db.session.add(trx_group)
            db.session.flush()
            line["group_id"] = trx_group.id

        line["year"] = grp[2]
        line["month"] = grp[3]
        line["revision"] = self._transaction_groups[grp]

        return line

I have tried to pass the data frame to the method def _update_trx() and use all the columns required to create group as list and use that to query the db. Second thing which I did was I directly called that method def _update_trx() and passed the data column as list and then as an argument to the function but that also failed as memory error

0

There are 0 answers