| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package main
- import (
- "fmt"
- "os"
- "path/filepath"
- "sort"
- )
- /*----- MapReduce frame -----*/
- const chanSize = 1000
- type Config struct {
- path string
- mapCnt int
- reduceCnt int
- mapFunc func([]byte, chan Pair)
- combineFunc func(string, []int) Pair
- reduceFunc func(string, []int) Pair
- }
- /* Pair type */
- type Pair struct {
- key string
- value int
- }
- /* Aggregated Pair type */
- type AggPair struct {
- key string
- list []int
- }
- /* Pair Slice type */
- type PairSlice []Pair
- /* Frame-defined mapper */
- func Mapper(config Config, filename chan string, shuffle chan Pair) {
- output := make(chan Pair, chanSize)
- merge := make(map[string][]int)
- for name := range filename {
- text, _ := os.ReadFile(name) // read file content
- go config.mapFunc(text, output)
- for pair := range output {
- if pair.key == "" { // map finish
- break
- }
- merge[pair.key] = append(merge[pair.key], pair.value) // merge in map stage
- }
- }
- for key, list := range merge {
- shuffle <- config.combineFunc(key, list) // combine local result, to shuffle stage
- }
- shuffle <- Pair{"", 0} // mapper finish
- }
- /* Frame-defined reducer */
- func Reducer(config Config, keylist chan AggPair, collect chan Pair) {
- for aggPair := range keylist {
- collect <- config.reduceFunc(aggPair.key, aggPair.list) // to final stage
- }
- collect <- Pair{"", 0} // reducer finish
- }
- /* MapReduce job */
- func Run(config Config) (PairSlice, error) {
- workerCnt := 0
- dataDir, err := os.ReadDir(config.path) // get directory info
- if err != nil {
- return nil, fmt.Errorf("Directory not found!")
- }
- /* Map stage */
- filename := make(chan string, chanSize)
- go func() {
- defer close(filename)
- for _, file := range dataDir {
- if file.IsDir() {
- continue
- }
- filename <- filepath.Join(config.path, file.Name()) // to map stage
- }
- }()
- shuffle := make(chan Pair, chanSize)
- for i := 0; i < config.mapCnt; i++ {
- workerCnt++
- go Mapper(config, filename, shuffle) // start mapper
- }
- /* Shuffle stage */
- merge := make(map[string][]int)
- for pair := range shuffle { // get from map stage
- if pair.key == "" {
- workerCnt--
- if workerCnt == 0 {
- break
- }
- } else {
- merge[pair.key] = append(merge[pair.key], pair.value) // merge in shuffle stage
- }
- }
- /* Reduce stage */
- keylist := make(chan AggPair, chanSize)
- go func() {
- defer close(keylist)
- for key, list := range merge {
- keylist <- AggPair{key, list} // to reduce stage
- }
- }()
- collect := make(chan Pair, chanSize)
- for i := 0; i < config.reduceCnt; i++ {
- workerCnt++
- go Reducer(config, keylist, collect) // start reducer
- }
- /* Final stage */
- output := make(PairSlice, 0)
- for pair := range collect { // get from reduce stage
- if pair.key == "" {
- workerCnt--
- if workerCnt == 0 {
- break
- }
- } else {
- output = append(output, pair)
- }
- }
- return output, nil
- }
- /*----- Custom function -----*/
- /* Functions used for sort */
- func (s PairSlice) Len() int {
- return len(s)
- }
- func (s PairSlice) Swap(i, j int) {
- s[i], s[j] = s[j], s[i]
- }
- func (s PairSlice) Less(i, j int) bool {
- if s[i].value == s[j].value {
- return s[i].key < s[j].key
- }
- return s[i].value > s[j].value
- }
- /* User-defined map function */
- func Map(text []byte, output chan Pair) {
- word := ""
- for i := range text {
- if text[i] >= 'A' && text[i] <= 'Z' {
- text[i] += 'a' - 'A' // to lower case
- }
- if text[i] >= 'a' && text[i] <= 'z' || word != "" && text[i] == '\'' {
- word += string(text[i]) // extend word
- } else if word != "" {
- output <- Pair{word, 1}
- word = ""
- }
- }
- if word != "" {
- output <- Pair{word, 1}
- }
- output <- Pair{"", 0} // map finish
- }
- /* User-defined reduce function */
- func Reduce(key string, list []int) Pair {
- sum := 0
- for _, value := range list {
- sum += value
- }
- return Pair{key, sum} // reduce finish
- }
- func main() {
- config := Config{
- path: "../testdata", // directory that stores data
- mapCnt: 4, // number of mapper
- reduceCnt: 4, // number of reducer
- mapFunc: Map, // map function
- combineFunc: Reduce, // combine function
- reduceFunc: Reduce} // reduce function
- answer, err := Run(config)
- if err != nil {
- fmt.Println(err)
- return
- }
- sort.Sort(answer)
- for _, pair := range answer {
- fmt.Printf("%s\t%d\n", pair.key, pair.value)
- }
- }
|