Batch processing

So far we have seen in details requests and queries, producing responses and results.

In fact, with the web and the RESTful APIs being so common, we even take this approach for granted, and often forget there are other ways of dealing with data:

Services (online) Waits for a request from a client to arrive, then when it comes, tries to handle it as fast as possible and send a response back. Services need to be always up and fast: Uptime and response time are their key measures.

Batch processing systems (offline) They are usually scheduled jobs, firing periodically. They are said to be offline as there is generally not a client waiting for them to complete. They take a certain quantity of input data (generally large), process it as quickly as possible (thoughput), and produce output data. Throughput is its key measure.

Stream processing systems A kind of in-between online and offline processing. Like batches, stream processors take input and produce output data without responding to requests. But they are also online as they actively listen to new incoming data, rather than firing periodically. Because of this "micro-batch" / online approach, they are also called real-time or near real-time systems.

Batch Processing with Unix Tools

Simple Log Analysis

Example of a web server access logs:

$ cat /var/log/nginx/access.log | 1
  awk '{print $7}' | 2
  sort             | 3
  uniq -c          | 4
  sort -r -n       | 5
  head -n 5          6
  1. Read the file
  2. Split each line by whitespaces, and take the 7th element (URL)
  3. Sort alphabetically
  4. Get only unique values, count them and display count next to them
  5. Sort them again by the number (-n), in reverse order
  6. Show just the 5 first ones

This can be done by chaining / piping all these linux commands or with a program. A batch would then fire periodically, execute that and save the output somewhere.

On these commands, inputs are read from stdin (standard input) and outputs are produced in stdout (standard output).

The Unix Philosophy

  1. Make each program do one thing well. To do a new job, build afresh rather than complicate old programs by adding new “features”.
  2. Expect the output of every program to become the input to another, as yet unknown, program. Don’t clutter output with extraneous information. Avoid stringently columnar or binary input formats. Don’t insist on interactive input.
  3. Design and build software, even operating systems, to be tried early, ideally within weeks. Don’t hesitate to throw away the clumsy parts and rebuild them.
  4. Use tools in preference to unskilled help to lighten a programming task, even if you have to detour to build the tools and expect to throw some of them out after you’ve finished using them.

Sounds familiar? There were put together in 1964 (!!!) by Doug McIlroy, the inventor of Unix pipes.

MapReduce and Distributed Filesystems

MapReduce ressembles the Unix tools described above, but on distributed on several machines.

MapReduce jobs read and write files on a distributed filesystem, which is their equivalent of stdin and stdout. Some of which include:

  • HDFS (Hadoop Distributed File System)
  • GFS (Google File System)
  • GlusterFS
  • QFS
  • Amazon S3
  • Azure Blob Storage
  • OpenStack Swift
  • etc...

How it works: Example with HDFS HDFS runs as a daemon on each machine (DataNode), exposing a network service which allows other nodes to access files stored on that machine. A master (NameNode) keeps track of which DataNode stores what file block. File blocks are replicated for fault-tolerance.

MapReduce Job Execution

Read below in parallel to the Simple log analysis above.

  1. Read a set of input files, and break it up into records (cat)
  2. Call map function to extract key/value pairs from each record (awk: the $7th element is the key, no value is associated)
  3. Sort by key (sort)
  4. Call reduce function to iterate over sorted key/value pairs. (uniq -c: count)

These 4 steps is 1 MapReduce Job. Mapper and Reducer functions are custom (you write them), and the 2 others are MapReduce internals.

Mapper Called for every input record. Does not keep state from one record to the next. Its job is to extract any number of key/value pairs from the input.

Reducer Called for every key from the mapper. Its input is the key and the collection of values for this key. Reducer can then operate on these records, such as counting them, effectively then giving the count per key as in our example.

Distributed execution

Map and Reduce tasks are both distributed on multiple machines.

  1. The job's code is first copied to the different machines
  2. Map tasks start and mappers being reading input files from the distributed filesystem. There are as many map tasks as input file blocks. (The number of reduce tasks is configurable by the job's author)
  3. Map read operations send records read one by one to the map function
  4. Map results are written to the local Mapper's machine disk and Reducers are notified by the MapReduce scheduler
  5. Reduce tasks can then start by fetching results from each of the Mappers and download files for their partition (this step is known as shuffle).
  6. Reducers then merge each of their files preserving the sort order.
  7. The reduce function is finally called with the key and iterator and records are written on the distributed filesystem (which is: one file on the local filesystem, then replicated on other machines)

Workflows

As you may have guessed already, MapReduce cannot perform all operations from our Simple Log Analysis example at once since it involves another sort. So with one MapReduce job, we can count all occurrences of URLs from the nginx logs, but not order them by this occurence.

We can, however, use this output as the input for another MapReduce job, which would be responsible for sorting them. This chain of jobs is called a workflow, just like the unix pipes.

A job must be complete before it can be used by another one. Various higher-level tools for Hadoop, such as Pig, Hive, Cascading, Crunch, also set up workflows of multiple MapReduce stages that are automatically wired together appropriately.

Reduce-Side Joins and Grouping

In many datasets, it is common for one record to have an association with another. Foreign key in a relational model, document reference in document model, edge in a graph model. A join is then required when you need to access records on both sides of that association.

Example: Activity of users

  • Left: log of what logged-in users did on a website. Contains reference to users by user_id only
  • Right: Database of registered users. Contains all informations about users, including their user_id.

So what if we want to correlate activity and date of birth?

Embedding the whole user profile in the activity log is not realistic, so we need some sort of join:

  • Query the user database for every record when mapping. Not very effective. It is better for the jobs to be done locally and on deterministic files (non changing).
  • Getting a copy of the database with an ETL job and make it available to the Mapper's filesystem so that MapReduce can then use the 2 sets of files as inputs (log and users).

Map-Side Joins

A bit more complicated..?

The Output of Batch Workflows

  • Search indexes
  • Key-value stores: machine learning systems such as classifiers (spam filters, anomaly detection, image recognition) and recommendation systems (people you may know, products you may be interested in, or related searches)

Comparing Hadoop to Distributed Databases

Hadoop opened up the possibility of indiscriminately dumping data into HDFS, and only later figuring out how to process it further. By contrast, MPP databases typically require careful up-front modeling of the data and query patterns before importing the data into the database’s proprietary storage format.

Beyond MapReduce

Abstractions on top of MapReduce:

But MapReduce’s approach of fully materializing intermediate state has downsides compared to Unix pipes:

  • A MapReduce job can only start when all tasks in the preceding jobs (that generate its inputs) have completed, whereas processes connected by a Unix pipe are started at the same time, with output being consumed as soon as it is produced. Skew or varying load on different machines means that a job often has a few straggler tasks that take much longer to complete than the others. Having to wait until all of the preceding job’s tasks have completed slows down the execution of the workflow as a whole.
  • Mappers are often redundant: they just read back the same file that was just written by a reducer, and prepare it for the next stage of partitioning and sorting. In many cases, the mapper code could be part of the previous reducer: if the reducer output was partitioned and sorted in the same way as mapper output, then reducers could be chained together directly, without interleaving with mapper stages.
  • Storing intermediate state in a distributed filesystem means those files are replicated across several nodes, which is often overkill for such temporary data.

So in order to fix these problems with MapReduce, several new execution engines for distributed batch computations were developed:

They are called dataflow engines and:

  • avoid alternating map and reduce phases, thus allowing more flexibility
  • avoid writing intermediate state to HDFS

results matching ""

    No results matching ""