package main import ( "fmt" "os" "path/filepath" "sort" ) /*----- MapReduce frame -----*/ const chanSize = 1000 type Config struct { path string mapCnt int reduceCnt int mapFunc func([]byte, chan Pair) combineFunc func(string, []int) Pair reduceFunc func(string, []int) Pair } /* Pair type */ type Pair struct { key string value int } /* Aggregated Pair type */ type AggPair struct { key string list []int } /* Pair Slice type */ type PairSlice []Pair /* Frame-defined mapper */ func Mapper(config Config, filename chan string, shuffle chan Pair) { output := make(chan Pair, chanSize) merge := make(map[string][]int) for name := range filename { text, _ := os.ReadFile(name) // read file content go config.mapFunc(text, output) for pair := range output { if pair.key == "" { // map finish break } merge[pair.key] = append(merge[pair.key], pair.value) // merge in map stage } } for key, list := range merge { shuffle <- config.combineFunc(key, list) // combine local result, to shuffle stage } shuffle <- Pair{"", 0} // mapper finish } /* Frame-defined reducer */ func Reducer(config Config, keylist chan AggPair, collect chan Pair) { for aggPair := range keylist { collect <- config.reduceFunc(aggPair.key, aggPair.list) // to final stage } collect <- Pair{"", 0} // reducer finish } /* MapReduce job */ func Run(config Config) (PairSlice, error) { workerCnt := 0 dataDir, err := os.ReadDir(config.path) // get directory info if err != nil { return nil, fmt.Errorf("Directory not found!") } /* Map stage */ filename := make(chan string, chanSize) go func() { defer close(filename) for _, file := range dataDir { if file.IsDir() { continue } filename <- filepath.Join(config.path, file.Name()) // to map stage } }() shuffle := make(chan Pair, chanSize) for i := 0; i < config.mapCnt; i++ { workerCnt++ go Mapper(config, filename, shuffle) // start mapper } /* Shuffle stage */ merge := make(map[string][]int) for pair := range shuffle { // get from map stage if pair.key == "" { workerCnt-- if workerCnt == 0 { break } } else { merge[pair.key] = append(merge[pair.key], pair.value) // merge in shuffle stage } } /* Reduce stage */ keylist := make(chan AggPair, chanSize) go func() { defer close(keylist) for key, list := range merge { keylist <- AggPair{key, list} // to reduce stage } }() collect := make(chan Pair, chanSize) for i := 0; i < config.reduceCnt; i++ { workerCnt++ go Reducer(config, keylist, collect) // start reducer } /* Final stage */ output := make(PairSlice, 0) for pair := range collect { // get from reduce stage if pair.key == "" { workerCnt-- if workerCnt == 0 { break } } else { output = append(output, pair) } } return output, nil } /*----- Custom function -----*/ /* Functions used for sort */ func (s PairSlice) Len() int { return len(s) } func (s PairSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s PairSlice) Less(i, j int) bool { if s[i].value == s[j].value { return s[i].key < s[j].key } return s[i].value > s[j].value } /* User-defined map function */ func Map(text []byte, output chan Pair) { word := "" for i := range text { if text[i] >= 'A' && text[i] <= 'Z' { text[i] += 'a' - 'A' // to lower case } if text[i] >= 'a' && text[i] <= 'z' || word != "" && text[i] == '\'' { word += string(text[i]) // extend word } else if word != "" { output <- Pair{word, 1} word = "" } } if word != "" { output <- Pair{word, 1} } output <- Pair{"", 0} // map finish } /* User-defined reduce function */ func Reduce(key string, list []int) Pair { sum := 0 for _, value := range list { sum += value } return Pair{key, sum} // reduce finish } func main() { config := Config{ path: "../testdata", // directory that stores data mapCnt: 4, // number of mapper reduceCnt: 4, // number of reducer mapFunc: Map, // map function combineFunc: Reduce, // combine function reduceFunc: Reduce} // reduce function answer, err := Run(config) if err != nil { fmt.Println(err) return } sort.Sort(answer) for _, pair := range answer { fmt.Printf("%s\t%d\n", pair.key, pair.value) } }