Data Lake: fix corrupted files on Ingestion vs ETL

746 views Asked by At

Objective

I'm building datalake, the general flow looks like Nifi -> Storage -> ETL -> Storage -> Data Warehouse.

The general rule for Data Lake sounds like no pre-processing on ingestion stage. All ongoing processing should happen at ETL, so you have provenance over raw & processed data.

Issue

Source system sends corrupted CSV files. Means besides header and data, the first too lines are always of free format metadata we'll never use. Only single table is corrupted, the corrupted CSV is used by single Spark job at the moment (lets call it X).

Question

Is it a good approach to remove those two lines at Nifi layer? See option 3 at "Workarounds".

Workarounds

  1. Handle the corrupted records inside Spark job X. IMHO, this is bad approach, because we gonna use that file at different tools in future (data governance schema crawlers, maybe some Athena/ADLA-like engines over ADLS/S3). Means corrupted records handling logic should be implemented at multiple places.
  2. Fix corrupted files on ETL layer and store them at "fixed" layer. All ongoing activities (ETL, data governance, MPP engines) will work only with "fixed" layer, instead of "raw" layer. This sounds for me as an overhead, to create a new layer for single CSV.
  3. Fix (remove the first two strings from the CSV) at Nifi layer. Means "raw" storage layer will always contain readable data. IMHO, this is good because it's simple and the handling logic is implemented at one place.
2

There are 2 answers

0
Carlos Robles On BEST ANSWER

First thing, I think that your question is brilliant and in the way you expose the mental process I can say that you have your answer already.

As you mention

The general rule for Data Lake sounds like no pre-processing on the ingestion stage.

This is the philosophical bottom line, and all the hype is growing over this easy to oversimplify idea.

If we check the definition of AWS of what is a data lake.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

It is a basic definition, but let's use it as a "appeal to authority". They say clearly that you can store data "as-is".

  1. My first question is: does "you can" mean strictly "you should"?. Also, they mention that it allows you to "run different types of analytics—from dashboards and visualizations to big data processing", etc.
  2. My second question is: if the data is knowingly unstable for actually anything...is it legit to anyways dump it there?

In the same link, a bit below, the also say

The main challenge with a data lake architecture is that raw data is stored with no oversight of the contents. For a data lake to make data usable, it needs to have defined mechanisms to catalog, and secure data. Without these elements, data cannot be found, or trusted resulting in a “data swamp." Meeting the needs of wider audiences require data lakes to have governance, semantic consistency, and access controls.

In general my way of looking at it, is that throwing everything there to follow the rule of "no preprocessing, is a general attempt of being more catholic than the pope, or maybe a general tendency to oversimplify the rules. I believe that the idea of "as is", and the power of it goes more in the direction of not doing data filtering or transformation in injection, assuming that we don't really know what are all the possible use cases in the future, so having raw data is good and scalable. But it doesn't mean that having data that we know is corrupted is good, and I believe that quality is a requirement always for data and in all stages should be at least accessible.

This takes me to the next thought: one very repeated idea is that data lake allows schema-on-read (AWS, Intuit, IBM, O'Reilly). Being so, it makes sense to keep as much as possible something with some kind of schema, if we don't want to overcomplicate the life of everyone that will potentially want to use it, otherwise, we could maybe render it in cases useless as the overhead of using it can be discouraging. Actually, the O'Reilly article above, called "the death of schema on read" talks about exactly the complexity added by the lack of governance. So I guess removing some chaos will help the success of the data lake.

So far I think my position is very clear for myself -it was not that much when I started writing the response- but I will try to wrap up with the latest reference, that is an article that I read a few time. Published in gartner.com' press room as early as 2014, it is called "Beware of the Data Lake Fallacy". The whole article is quite interesting, but I will highlight this part

Data lakes, therefore, carry substantial risks. The most important is the inability to determine data quality or the lineage of findings by other analysts or users that have found value, previously, in using the same data in the lake. By its definition, a data lake accepts any data, without oversight or governance. Without descriptive metadata and a mechanism to maintain it, the data lake risks turning into a data swamp.

I agree with that. It is fun at the beginning. Save everything, see you S3 bucket populated, and even run a few queries in Athena or Presto or run some Spark jobs over lots of gzip files and feel that we are in a magic time to live in. But then this small contamination comes, and we accept it, and someday the S3 buckets are not 10 but 100, and the small exceptions are not 2 but 20, and too many things to keep in mind and things get messier and messier.

Eventually this is opinion-based. But I would say usable data will make happier your future self.

Said this, I would go to your options:

  1. Handle the corrupted records inside Spark job X. You said it. That would be hating yourself and your team, cursing them to do a work that could be avoided.

  2. Fix corrupted files on ETL layer and store them at "fixed" layer. You said it, too much overhead. You will continually tempt to delete the first layer. Actually I forecast you would end up with a lifecycle policy to get rid of old objects automatically to save cost.

  3. Seems neat and honest. No one can tell you "that is crazy". The only thing you need to make sure is that actually the data you will delete is not business-related, and there is not a possible use in the future that you cannot figure now. Even in this case, I would follow some approach to play safe:

    • Remove the first two strings from the CSV at Nifi layer, and save the readable data in the "raw" storage layer
    • To protect yourself from the case of "we didn't see this coming" keep a meta-data bucket in which you save simple files with those 2 lines removed, so you can access them in the future if need be, and you can reply to anyone with a different opinion that can say in the future "you shouldn't have deleted that". But I say this because I cannot imagine what those two lines are, maybe this is totally overkilling.

Personally, I love data lakes, and I love the philosophy behind every system but I also like to question everything case by case. I have lots of data in flat files, json, csv, and a lot of production workload based on that. But the most beautiful part of my data lake is not really purely unprocessed data, we found extremely powerful to do a first minimal cleanup, and when possible -for data that has fundamentally inserts and not updates-, also transform it to Parquet or ORC and even compress it with snappy. And I can tell you that I really enjoy using that data, even run queries on it directly. Raw data yes, but usable.

12
Douglas M On

I like the philosophy offered in the accepted answer but I'd like to provide a more tactical answer...

  • Use the handle 'bad records' option on the spark read, e.g.:
spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .format("csv")
  .load("/input/csvFile.csv")

Reference "Handling bad records and files"

Reference "CSV files"

You can use this with a schema option .schema(customSchema) code to get a level of schema verification too (and better performance) on the read side of your jobs.

  • To perform schema checks on write, take a look at Delta Lake open source project which has schema on write enforcement and ACID transactions for more reliability.

  • Managed Delta Lake will let you bin pack your small files with the OPTIMIZE command Databricks Delta Lake Optimize command

    • Because of ACID transactions and bin packing, Spark Structured Streaming and Delta Lake work really well together to continue the streaming data acquisition Nifi is performing.