MapReduce - munching through Big Data
How Google tackled the problem of processing enormous amounts of data, and how you can do the same with Go.
It's been a while since the last post, and I have to apologize for the long wait. The last weeks have been quite busy, but I finally managed to complete another article. I hope you'll enjoy it.
Map and Reduce
This is going to be a boring article about two boring functions, map()
and reduce()
. Here is the story:
You have a list with elements of type, say, string
.
var list []string
You define a function that takes a string
and produces an int
. Let's say you want to know the length of a string.
func length(s string) int {
return len(s)
}
Now you define a function called map()
that takes this function and applies it to each of the elements in the list and returns a list of all results.
func mäp(list []string, fn func(string)int) []int { // "map" is a reserved word, "mäp" isn't
res := make([]int, len(list))
for i, elem := range list {
res[i] = fn(elem)
}
return res
}
Finally, you define another function reduce()
that takes the result list and boils it down to a single result..
func reduce(list []int, fn func(int, int)int) (res int) {
for _, elem := range list {
res = fn(res, elem)
}
return res
}
func sum(a,b int) int {
return a+b
}
Now you can wire it all up.
func main() {
list := []string{"a", "bcd", "ef", "g", "hij"}
res := reduce(mäp(list, len), sum)
fmt.Println(res)
}
Here is the whole thing visualized. (Click on Play to start the animation.)
That's it. End of the story. Pretty boring, eh?
But wait! …
… what's this?
Looks like we just abstracted away the concept of for
loops!
Now if that's not something to brag about on the next Gopher meetup…
However, does this buy us anything else? Indeed it does:
First, no more one-off index errors.
Second, and more importantly, if the mapped function
fn
does not depend on previous results, it can be trivially called in a concurrent manner.
How to do this? Easy: Split the list into n pieces and pass them to n independently running mappers. Next, have the mappers run on separate CPU cores, or even on separate CPU's.
Imagine the speed boost you'll get. Map and reduce, as it seems, form a fundamental concept for efficient distributed loops.
Lemme repeat that. By abstracting away the very concept of looping, you can implement looping any way you want, including implementing it in a way that scales nicely with extra hardware.
Joel Spolsky, Can Your Programming Language Do This? (2006)
From map() and reduce() to MapReduce
Google researchers took the map/reduce concept and scaled it up to search engine level (I leave the exact definition of “search engine level” as an exercise for the reader). MapReduce was born.
The result was a highly scalable, fault-tolerant data processing framework with the two functions map()
and reduce()
at its core.
Here is how it works.
Let's say we have a couple of text files and we want to calculate the average count of nouns & verbs per file.
Our imaginary test machine has eight CPU cores. So we can set up eight processing entities/work units/actors (or whatever you want to call them):
- One input reader
- Three mappers
- One shuffler, or partitioner
- Two reducers
- One output writer
The input reader
The input reader fetches the documents, turns each one into a list of words, and distributes the lists among the mappers.
The mapper
Each of the mappers reads the input list word by word and counts the nouns and verbs in that list.
The result is a key-value list of word types (noun, verb) and counts. For example, our three mappers could return these counts:
mapper 1:
nouns: 7
verbs: 4
mapper 2:
nouns: 5
verbs: 8
mapper 3:
nouns: 6
verbs: 3
When a mapper has finished, it passes the result on to the shuffler.
The shuffler
The shuffler receives the output lists from the mappers. It rearranges the data by key; that's why it is also referred to as “partitioning function”. In our example, the shuffler generates two lists, one for nouns and one for verbs:
list 1:
nouns: 7
nouns: 5
nouns: 6
list 2:
verbs: 4
verbs: 8
verbs: 3
The shuffler then passes each list to one of the two reducers.
The reducer
Each reducer receives a list with a couple of counts. It simply runs through the list, adds up all the counts, and divides the result by the number of counts. Both reducers then send their output to the output writer.
Back to our example. The first reducer would calculate an average of
(7 + 5 + 6) / 3 = 6
and the other one would return
(4 + 8 + 3) / 3 = 5
The output writer
All the output writer has to do is collecting the results from the reducers and write them to disk or pass them on to some consumer process.
Summary
To make all this less abstract, here is the same as an animation. (Click on Play.)
This concept easily scales beyond a single multi-CPU machine. The involved entities - input reader, mapper, shuffler, reducer, and output writer - can even run on different machines if required.
But MapReduce is more than just some distributed version of map()
and reduce()
. There are a couple of additional bonuses that we get from a decent MapReduce implementation.
- A good deal of the functionality is the same for any kind of map/reduce task. These parts can be implemented as a MapReduce framework where the user just needs to provide the
map
andreduce
functions. - The MapReduce framework can provide fault recovery. If a node fails, the framework can re-execute the affected tasks on another node.
- With fault tolerance mechanisms in place, MapReduce can run on large clusters of commodity hardware.
The code
The code below is a very simple version of the noun/verb average calculation. To keep the code short and clear, the mapper does not actually identify nouns and verbs. Instead, the input text is just a list of strings that read either “noun” or “verb”. Also, the reducer does not receive key/value pairs but rather just the values. We already know that one reducer receives the nouns and the other receives the verbs.
import ( “fmt” “sync” )
func mapper(in <-chan string, out chan<- map[string]int) {
count := map[string]int{}
for word := range in {
count[word] = count[word] + 1
}
out <- count
close(out)
}
func reducer(in <-chan int, out chan<- float32) {
sum, count := 0, 0
for n := range in {
sum += n
count++
}
out <- float32(sum) / float32(count)
close(out)
}
func inputReader(out [3]chan<- string) {
input := [][]string{
{"noun", "verb", "verb", "noun", "noun"},
{"verb", "verb", "verb", "noun", "noun", "verb"},
{"noun", "noun", "verb", "noun"},
}
for i := range out {
go func(ch chan<- string, word []string) {
for _, w := range word {
ch <- w
}
close(ch)
}(out[i], input[i])
}
}
merge
function
from the
Pipelines article of the
Go Blog.func shuffler(in []<-chan map[string]int, out [2]chan<- int) {
var wg sync.WaitGroup
wg.Add(len(in))
for _, ch := range in {
go func(c <-chan map[string]int) {
for m := range c {
nc, ok := m["noun"]
if ok {
out[0] <- nc
}
vc, ok := m["verb"]
if ok {
out[1] <- vc
}
}
wg.Done()
}(ch)
}
go func() {
wg.Wait()
close(out[0])
close(out[1])
}()
}
func outputWriter(in []<-chan float32) {
var wg sync.WaitGroup
wg.Add(len(in))
out[0]
contains the nouns, out[1]
the verbs. name := []string{"noun", "verb"}
for i := 0; i < len(in); i++ {
go func(n int, c <-chan float32) {
for avg := range c {
fmt.Printf("Average number of %ss per input text: %f\n", name[n], avg)
}
wg.Done()
}(i, in[i])
}
wg.Wait()
}
func main() {
I could have used loops instead, to create arrays or slices of channels. Apparently, copy/paste has won.
size := 10
text1 := make(chan string, size)
text2 := make(chan string, size)
text3 := make(chan string, size)
map1 := make(chan map[string]int, size)
map2 := make(chan map[string]int, size)
map3 := make(chan map[string]int, size)
reduce1 := make(chan int, size)
reduce2 := make(chan int, size)
avg1 := make(chan float32, size)
avg2 := make(chan float32, size)
go inputReader([3]chan<- string{text1, text2, text3})
go mapper(text1, map1)
go mapper(text2, map2)
go mapper(text3, map3)
go shuffler([]<-chan map[string]int{map1, map2, map3}, [2]chan<- int{reduce1, reduce2})
go reducer(reduce1, avg1)
go reducer(reduce2, avg2)
outputWriter([]<-chan float32{avg1, avg2})
}
The code is go get
able from
GitHub. Ensure to use -d so that the binary does not make it into $GOPATH/bin
.
go get -d github.com/appliedgo/mapreduce
cd $GOPATH/github.com/appliedgo/mapreduce
go run mapreduce.go
This code also runs in the Go Playground.
Homework assignment 😉: Add logging to the code to visualize the control flow. I intentionally did not add logging statements to keep the code easy to read.