Developer Documentation
# Let's Data : Focus on the data - we'll manage the infrastructure!
Cloud infrastructure that simplifies how you process, analyze and transform data.
Case Studies
Big Data: Building a Document Index From Web Crawl Archives
Download PDFAbstract
In this big data case study, we processed the Common Crawl Web Archives files using the #Let's Data. Common Crawl is an open repository of web crawl data and a fantastic resource for the www web crawl data. https://commoncrawl.org/
We used #Let's Data compute to reduce the web archives to JSON documents that could be used to create a database index. We processed ~ 219K files, ~477 TB S3 data in 48 hours at a nominal cost of $5 per TB-Hour.
Problem Definition
Big Data datasets are generally huge datasets spread across a large number of files. Processing these files at high scale and in a reasonable amount of time requires creating a data pipeline which can be a significant engineering infrastructure effort, is rife with infrastructure costs and can take many man-months to build and perfect. In this Big Data use-case, we want to process the Common Crawl Web Archives files (219K files, 477TB uncompressed data) and transform this semi-structured data to structured JSON documents that can be used to create a database index. Such an effort would require an understanding of the data domain (Common Crawl Web Archive Formats), infrastructure challenges such as reliable and fault tolerant compute infrastructure, maintainability, operability and non – trivial compute code. For example, the compute code needs to deal with transient failures, rate limiting, batching for performance, ordering and deduplication, checkpointing and failure restarts etc. Logging, Metrics and Diagnostics infrastructure need to be built in. Building such a dataset pipeline on #Let's Data eliminates these infrastructure requirements – the #Let’s Data promise is that the enterprises should "Focus on the data, we'll manage the infrastructure". We used #Let’s Data to process the Common Crawl Web Archives dataset – the system processed 219K files, ~477 TB of data in ~ 48 hours and extracted ~3 billion JSON documents – this roughly translates to a TPS of 17K documents per second! #Let’s Data simplified the creation and management of this data compute pipelines using AWS services, reduced the development time, costs, enabled high performance, availability, and elastic scale.Solution & Architecture
There are two different types of development efforts needed for such a Big Data use-case:- The Functional Data Model: Understanding the data formats for the big data functional domain and developing how to parse the data and extract output documents.
- The Data Pipeline Infrastructure: This is the infrastructure code that is required to orchestrate the data pipeline, reading from the source, writing to the destination, scheduling computation tasks and data jobs, tracking errors and building in fault tolerance and the necessary diagnostics.
In traditional data pipeline development, one would spend a disproportionately large development effort in developing the data pipeline infrastructure. With #Let’s Data, the focus is mostly on developing the functional data model, with only an integration effort to orchestrate and run the data pipeline.
Let’s look at each of these development efforts in detail.
Common Crawl Data Model
The Common Crawl Dataset has the following characteristics:
- it has three filetypes the Archive, Metadata and Conversion files
- each data record (crawled link) has data that is spread across these three files:
- the archive file has the http request and response with some high level metadata
- the metadata file has the metadata about the records in the archive file such as record types and their record offsets etc.
- the conversion template has the converted Html document
- each of these files follows a record state machine for each data record (crawled link) – for example,
- the archive file state machine is REQUEST -> RESPONSE -> METADATA for each crawled link
- the metadata file state machine is METADATA (Request) -> METADATA(Response) -> METADATA(Metadata) for each crawled link (remember that this is metadata about the archive file records)
- the conversion file state machine is simple – a single CONVERSION record for each crawled link
With this high-level information, we do the following development tasks:
- The POJOS: create Java POJOs that map to each record type – this is the majority of the work, where you define how to create an object from a byte array and validating the integrity of the object.
- The Parsers: define a parser state machine for each of the file using the #Let’s Data interfaces – this is relatively simpler, you encode the record types as a state machine and specify the start and end delimiters for each records
- The Reader: define a reader that constructs an output document from these file parser state machines using the #Let’s Data interface – this is the simplest of the three, encode the record retrieval logic from the parsers and then construct an output record by combining the these.
We’ve shared our implementation of the common crawl model at the Git Hub repository: https://github.com/lets-data/letsdata-common-crawl
#Let’s Data Data-Pipeline
With the above common crawl data model, we can now simply orchestrate the data pipeline by specifying the dataset configuration. We’d be creating a pipeline that reads the common crawl dataset files from AWS S3, writes them to AWS Kinesis and uses AWS Lambda to run the parser and extraction code. We also do some access setup so that #Let’s Data can automatically manage the read and write resources.
Here are the dataset configuration details:
- Read Connector configuration:
- the S3 Bucket to read from
- the JAR file that has the #Let’s Data interface implementations
- the mapping of #Let’s Data interfaces to file types (archive file type -> archive file parser class name etc.)
- Write Connector Configuration:
- the Kinesis stream that we need to write to
- the number of shards for the Kinesis stream
- Error Connector Configuration:
- the S3 Bucket to write the error records to
- Compute Engine Configuration:
- AWS Lambda compute details – these are the function concurrency, timeout, memory and log level
- Manifest file:
- the manifest file that defines the list of all the files that should be processed and their mapping – example:
- Each line in the manifest file becomes a #Let’s Data task that can be tracked from creation to completion and has its own progress, errors and diagnostics tracing.
We use the #Let’s Data CLI to create this dataset and monitor its execution via the CLI and Console.
Results
We ran this common crawl use-case on #Let’s Data to test the limits of our infrastructure and were pleasantly surprised by the staggering scale we were able to achieve at nominal costs. Here are some results at a glance:
- The system processed 73220 tasks out of the 80000 tasks in ~ 48 hours
- Tasks executed on AWS Lambda with a concurrent execution of 500 Tasks
- 219K files processed, read 477 TB of uncompressed data from S3, wrote 13 TB to AWS Kinesis.
- Extracted ~ 3 billion records that were written to AWS Kinesis Stream. ~16 million error records were written to AWS S3 as error records (0.5 % errors). (To put the 3 billion number into perspective, Google processes around 8.5 billion searches per day Source)
- The system peaked at reading 455 GB per minute from S3 and writing 12.36 GB per minute in AWS Kinesis, extracting 2.7 million records per minute (~45K records per second!)
- The costs for the dataset were $36,000 - approx. cost of $75 for each TB (uncompressed) and a $1.67 per TB-Hour
TCO Analysis
Developing on the #Let’s Data infrastructure has huge cost and time savings – here is a side by side comparison:
This is a 5X reduction!
How we built it?
- Code the Common Crawl Web Archive data model classes. Here is the common-crawl model that we built
- Implement 3 #Let's Data Parser Interfaces: WARCFileParser, WATFileParser, WETFileParser for the Common Crawl File Types
- Implement 1 Reader Interface: CommonCrawlReader with logic to combine extracted records into an index document.
- Implement SingleDocInterface: IndexRecord and CompositeDocInterface CompositeIndexRecord
- Create a manifest file to define the different tasks we want processed in the dataset. Here is the script we used to create the manifest file
- Create a dataset job on #Let's Data. Here are the CLI Commands to create the dataset. Create Dataset
- Monitor the dataset execution
- Monitoring task progressList Tasks
- Viewing metrics - see the example dashboard above. The raw data can be obtained by View Metrics
- Viewing task execution logsView Logs
- Monitoring errorsView Errors
- Re-driving error tasks on #Let's DataTasks Redrive
- Monitoring task progressList Tasks
Lessons Learnt
Lessons Learnt This case study validated our engineering MVP – we can process large datasets at scale with a large reduction TCO. The case study also did find issues that we fixed:
- Simplified dataset configuration removing redundant / not needed fields
- Schema fixes where data partitioning was not working effectively for large datasets
- Initialization workflow fixes – 80K task ingestion caused our initialization workflow to timeout
- Enable Log Levels – verbose logging was enabled during the test run which resulted in a larger than expected amount of logs (and costs)
- Token Sizes – We generate a of tokens dynamically on each API call - with 80K tasks, the difference between a 1024 bytes pagination token vs a 512 bytes pagination token quickly adds up
- we were hitting API response size limits and such reductions doubled the number of results we could return in each page. - Manually tweaked the AWS Kinesis Stream's shard scaling during the run which accounts for difference in throughput and latencies during the run – need to write a dynamic optimizer (scaler / descaler)
- This was the first real large scale test of the system - while the system performed really well, we made a large number of fit and finish fixes across the stack