P1040262 (1)

Mastering Big Data with Distributed Processing

Andreas Kretz Blog 32 Comments

Ever wish you could get an easy answer to what is Big Data? Get the Big Data Checklist, totally free, along with weekly blog tips delivered directly to your inbox.

Distributed processing is the backbone of Big Data. How to efficiently store and process data with a scalable solution is the focus in this part of my series “Learning Big Data the right way”.

We will talk about data locality with HDFS. And to top it off, I will show you on an real world example how distributed processing with MapReduce works.

In part one of the series, I ranted about the extract transform, load (ETL) problem of SQL databases and how hard it is to scale such systems. If you missed it you can find it here: Link

Finally, distributed processing explained

What distributed processing means, is that you are not doing the analytics on a single server. You do it parallel on multiple machines.

A distributed setup usually consists of a master, who is doing the management of the whole process and slaves who are doing the actual processing.

To process data in parallel, the master needs to load the data, chop it up into pieces and distribute them to the the slaves for computation. Each slave is then doing the analytics (calculations) you programmed.

Another ETL nightmare

Distributing data from the master to the slaves has one major flaw: Because the needed data is stored locally on some system, the master would have to first load it from the source and temporarily store it.

This approach basically creates the same problems as the ETL process of SQL databases. For very large datasets, loading data to a single master and distributing it to the slaves is very inefficient.

Another extract-bottleneck. 🙁

Waking up from the nightmare with data locality through HDFS

The ETL problem can be evaded by not using a central data store. Storing pieces of data locally on every processing slave is the key.

The master only manages what piece of data is stored on which slave. For processing, the master then only needs to tell the slaves which block of data, stored on that node has to be processed.

This perfectly brings us to the Hadoop Distributed File System, HDFS.

The above described data locality is exactly what HDFS is doing. It is the foundation of Hadoop and the first thing you need to learn and understand.

You can think of HDFS as an actual file system above the file system on the server. The difference between a normal file system like NTFS or ext* is that HFDS is a distributed file system.

This means, it spans over all connected slaves who are called data nodes. Stored files get automatically chopped up into 128MB blocks.

These blocks are then automatically distributed over the network to the data nodes. Every node only gets some pieces of the file, depending on the size of the cluster.

To not lose some data in case of a server error, blocks are automatically replicated twice to some other nodes.

Boom done, that’s it, right? All problems solved? Well, not quite.

Very often you need to further process the distributed results to create a desired output. One solution would be to send all the results from the nodes back to the master for further processing.

But, sending results of the nodes back to a single master is a bad idea. It creates another bottleneck.

While the results of the nodes should be a lot smaller than the raw input they may also be quite large (gigabytes).

This problem can be solved by using MapReduce.

Distributed processing with MapReduce

Since the early days of the Hadoop eco system, the MapReduce framework is one of the main components of Hadoop alongside the Hadoop file system HDFS.

Google implemented MapReduce to analyse stored html content of websites through counting all the html tags and all the words and combinations of them (for instance headlines). The output was then used to create the page ranking for Google Search.

That was when everybody started to optimise his website for the google search. Serious search engine optimisation was borne. That was the year 2004.

Back to our problem. What would be the solution for the bottleneck created by sending data back to the master? Well, what MapReduce is doing is that it is processing the data in two stages.

The map phase and the reduce phase.

In the map phase, the framework is reading data from HDFS. Each dataset is called an input record.

Then there is the reduce phase. In the reduce phase, the actual computation is done and the results are stored. The storage target can either be a database or back HDFS or something else.

After all it’s Java – so you can implement what you like.

The magic of MapReduce is how the map and reduce phase are implemented and how both phases are working together.

The map and reduce phases are parallelised. What that means is, that you have multiple map phases (mappers) and reduce phases (reducers) that can run in parallel on your cluster machines.

How MapReduce actually works

First of all, the whole map and reduce process relies heavily on using key/value pairs. That’s what the mappers are for.

In the map phase input data, for instance a file, gets loaded and transformed into key/value pairs.

When each map phase is done it sends the created key/value pairs to the reducers where they are getting sorted by key. This means, that an input record for the reduce phase is a list of values from the mappers that all have the same key.

Then the reduce phase is doing the computation of that key and its values and outputting the results.

How many mappers and reducers can you use in parallel? The number of parallel map and reduce processes depends on how many CPU cores you have in your cluster. Every mapper and every reducer is using one core.

This means that the more CPU cores you actually have, the more mappers you can use, the faster the extraction process can be done. The more reducers you are using the faster the actual computation is being done.

To make this more clear, I have prepared an example.

A real world internet of things example

As I said before, MapReduce works in two stages, map and reduce. Often these stages are explained with an word count task.

Personally, I hate this example because counting stuff is to trivial and does not really show you what you can do with MapReduce. Therefore, we are going to use a more real world use-case from the world of the internet of things (IoT).

IoT applications create an enormous amount of data that has to be processed. This data is generated by physical sensors who take measurements, like room temperature at 8.00 o’Clock.

Every measurement consists of a key (the timestamp when the measurement has been taken) and a value (the actual value measured by the sensor).

Because you usually have more than one sensor on your machine, or connected to your system, the key has to be a compound key. Compound keys contain additionally to the measurement time information about the source of the signal.

But, let’s forget about compound keys for now. Today we have only one sensor. Each measurement outputs key/value pairs like: Timestamp-Value.

The goal of this exercise is to create average daily values of that sensor’s data.

How MapReduce deals with IoT data

The image below shows how the map and reduce process works.

First, the map stage loads unsorted data (input records) from the source (e.g. HDFS) by key and value (key:2016-05-01 01:02:03, value:1).

Then, because the goal is to get daily averages, the hour:minute:second information is cut from the timestamp.

That is all that happens in the map phase, nothing more.

After all parallel map phases are done, each key/value pair gets sent to the one reducer who is handling all the values for this particular key.

Every reducer input record then has a list of values and you can calculate (1+5+9)/3, (2+6+7)/3 and (3+4+8)/3. That’s all.

What do you think you need to do to generate minute averages?

Yes, you need to cut the key differently. You then would need to cut it like this: “2016-05-01 01:02”. Keeping the Hour and minute information in the key.

What you can also see is, why map reduce is so great for doing parallel work. In this case, the map stage could be done by nine mappers in parallel because each map is independent from all the others.

The reduce stage could still be done by three tasks in parallel. One each for orange, blue and one for green.

That means, if your dataset would be 10 times as big and you’d have 10 times the machines, the time to do the calculation would be the same.

How to scale big data systems

Scaling such a Big Data System is quite easy. Because storage and processing is distributed, all you have to do is add more servers to the cluster.

By adding servers, you increase the number of disks available hence the storage capacity. This also results in more available CPUs for increased processing capability of the cluster.

Is MapReduce the right thing for you? There must be a catch, right?

I chose to explain you distributed processing through MapReduce because it is easy to understand. But MapReduce does not stand for distributed processing. It is just one implementation.

Should you use it or look for something else?

MapReduce has been invented for tasks like:

  • Analysing server logs to find out what has been going on
  • Analysing network traffic logs to identify network problems
  • Basically any problem that involves counting stuff in very large files

Here’s the problem with MapReduce:

MapReduce has one big restriction: It is a two step process, map and reduce.

Once data is processed through the map and reduce phase, it has to be stored again.

If your computation cannot be done in those two steps, you need to do the whole process again. Load the previous results from disk and do another map and reduce run because results cannot be stored in memory and later be used again.

This is where it does not make sense to use MapReduce in my opinion.

Distributed in memory processing with Apache Spark

If you want to do complex stuff like machine learning MapReduce is not the weapon of choice.

Thats why I’ll show you how distributed in memory processing with Apache Spark works. We will go over a Spark example where we analyse Twitch.tv chat and talk about stream and batch processing.

I will also show you a neat trick how you can prototype spark jobs and directly visualise the results.

Follow this link to the view the post: http://iotdonequick.com/2016/08/08/how-everybody-can-harvest-the-power-of-data-mining/

To make sure you don’t miss my next posts is to subscribe to my newsletter. Just put in your E-Mail address right here and hit subscribe.

This way I will be able to send you an E-Mail when I have uploaded the next article in this series.

You can also follow me on Twitter where I share stuff I find throughout the day:

Wanna do more? Please follow this Link to LinkedIn and hit the like button to share this article with your professional network. That would be super awesome!

See you next time!

Andreas

Comments 32

    1. Post
      Author
    1. Post
      Author

      Yes you are right. How this mistake got into the final version is a mystery to me.
      Thanks a lot! Already changed it.

  1. As someone who is interested in Big Data but still finishing my undergraduate degree, I enjoy both the informativeness and accessibility of your posts.

    Looking forward to the next one. Thank you!

    1. Post
      Author
  2. Andreas, A job well done and continue to keep up the good work. Enjoy your blogs and easy to understand for a old school guy just trying to learn about this topic!

  3. Nice blog with simple examples. Definitely it is different from other blogs. I found this is very interesting!!!

  4. Thank you very much. Waiting for your next post, I have already subscribed 🙂 Keep up the good work!

  5. Hi Andreas! Awesome articles! Easy and clear to understand, good work.
    Waiting for the next… 🙂
    Thanks!

  6. Hi Andreas,

    Excellent articles, and easy to understand.
    I have one query on MapReduce and Pig.
    When we can go for writing Pig scripts and when we go for writing MapReduce programs.
    Appreciate, if you provide any use case for this.

    1. Post
      Author

      Hi Kabulsha,
      good question! With Pig you can write your analytics in Pig Latin, a SQL style language. On execution Pig is then creating Mapreduce code in the background and runs it. I’d say it’s more beginner friendly to someone who already knows SQL language then writing Mapreduce jobs in Java.

      Personally, I never used Pig I have always written my Mapreduce code directly. It is quicker and more clear because you can think more in a key value oriented fashion. What I have seen is that Pig Latin is more table oriented (rows, columns). This is often not very helpful.

      Check out this examples how to do word count with pig and Mapreduce:
      http://salsahpc.indiana.edu/ScienceCloud/pig_word_count_tutorial.htm
      http://www.macalester.edu/~shoop/sc13/hadoop/html/hadoop/wc-detail.html

      Although the Java code of Mapreduce looks complicated you actually only need nine lines of code to do it (the code insight the map and reduce method).

      Please let me know if that helps you 🙂

  7. Interesting to read about another Big Data expert’s take on this. However, I think there’s one thing represented incorrectly in “How MapReduce deals with IoT data.” When you switch from getting the daily averages to the minute averages, in the process flow image you show the first “9-Output Records” to be the date of the readings. You then map different dates to the same “3-Output Records.” Shouldn’t the “9-Output Records” be the time and not the date of the reading? Then each reading taken during the same minute can be mapped to the same “3-Output Records” for the Reduce phase. Or is that process flow image out of order in the article and showing the previous example of getting the average by date?

    1. Post
      Author

      Hi Scott, yes you are right. The image is out of order. Big sorry! Thanks for pointing that out. I corrected it.

  8. Hello Andreas,

    The simplicity of your articles is refreshing!

    At the Data Intensive Discovery Initiative at the University at Buffalo, SUNY, an academic, National Laboratory, and commercial consortium we have been working for the past six years to create the capability to deliver massive scale analytics on Supercomputing platforms AND commodity scale-out infrastructures. We run experiments across many scientific domains that have different computing requirements: compute intensive, data intensive and a combination of both.

    We focused on creating a SQL analytics DBMS platform because the SQL language is ubiquitous. Although SQL platforms can only support structured data formats in a row or column format the vast majority of our use cases fell into the structured bucket. The computational world also had a bunch of people developing technologies for the unstructured data challenge.

    There are challenges with scaling out current SQL platforms:

    1) Data locality: To deliver query performance the data needs to be optimally placed into a physical data model across all the storage resources. Performance also requires tightly coupled compute and storage or in-memory execution (limiting database size). Querying multiple big tables through a table join function in current SQL platforms turns the database into mush! You have to make decisions about what questions you will be asking before you build the database. This is troublesome in research because you are always asking new questions and the results lead you down many paths.

    2) Data Modeling and ETL: This takes point 1 even further and expands on the ETL challenges you mentioned in your first article. Logical and Physical Data Modeling takes time, lots of time and effort to get data in the right format in the database so questions can be answered effectively. Ideally we want researchers to get answers to questions at the speed of thought.

    So with our National Laboratory colleagues and XtremeData we co-designed a new architecture SQL analytics database (dbX) that delivers performance on separate compute and storage using inspiration from the two+ decades of work on the Message Passing Interface used in the Supercomputing community.

    dbX addresses the above challenges because it is Schema Agnostic. We can simply stuff it with data as fast as our computation intensive (or IOT) applications can generate the data and create near-real-time ad hoc discovery capabilities in dbX for our scientists and researchers to consume. In fact, because dbX uses a PostgreSQL front end in most cases the researchers can build new large scale databases themselves!

    XtremeData has partnered with Amazon Web Services and Microsoft Azure to make it simple for people to access the capabilities of dbX and provides the technology to academic institutions for research purposes at no cost for use on premise.

    Keep up the good work!

    Regards,

    Todd

    1. Post
      Author
  9. Thank you for the intelligent pictorial explanation of Big Data. I can now go ahead with my decision to study Big data & Business Intelligence for my masters. Cheers.

  10. Hello Andreas,
    Very clear and good explanation. I am waiting for your next article on ‘Distributed in memory processing with Apache Spark’.

    Regards,
    Madhuker

  11. Andreas,

    Thank you for the very detailed and simple style of writeup. This has been very informative and helpful.

    However i have a clarification in the example of “How MapReduce deals with IoT data”.
    1. Is there any specific reason for grouping the 9 records in sets of 3 as input records in Maps Phase
    2. In the reduce phase the input records are grouped in a specific order such that the average of these sets results in 5 . Why is it not { [(1+2+3)/3] + [(4+5+6)/3] + [(7+8+9)/3]}/3

    Sorry if my questions were too silly.

    Thanks & Best Regards,
    Nalini

    1. Post
      Author

      Hi Nalini, please, there are no silly questions!

      Regarding your questions:

      1: I grouped the input records of the map phase together in sets of 3s just for this example. I did this to show that there are three mappers and reducers used by keeping also the input records separated. In real life the input records for a single mapper are tens- or hundreds of thousands. Also the will come from the same source like a file that is stored in HDFS.

      2: In the map phase the order of the keys does not matter. But when each key value pair (timestam,value) is moved to the reduce phase it gets sorted by key. This means that each reducer is only getting pairs with equal keys (the timestamp).

      Did that help?

Leave a Reply

Your email address will not be published. Required fields are marked *