|
@@ -0,0 +1,141 @@
|
|
|
|
|
+package main
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "fmt"
|
|
|
|
|
+ "io/ioutil"
|
|
|
|
|
+ "path/filepath"
|
|
|
|
|
+ "sort"
|
|
|
|
|
+ "strings"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+type Pair struct {
|
|
|
|
|
+ key string
|
|
|
|
|
+ value int
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+type AggPair struct {
|
|
|
|
|
+ key string
|
|
|
|
|
+ list []int
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+type PairSlice []Pair
|
|
|
|
|
+
|
|
|
|
|
+func (s PairSlice) Len() int {
|
|
|
|
|
+ return len(s)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s PairSlice) Swap(i, j int) {
|
|
|
|
|
+ s[i], s[j] = s[j], s[i]
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (s PairSlice) Less(i, j int) bool {
|
|
|
|
|
+ if s[i].value == s[j].value {
|
|
|
|
|
+ return s[i].key < s[j].key
|
|
|
|
|
+ } else {
|
|
|
|
|
+ return s[i].value > s[j].value
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func MapWorker(filename chan string, shuffle chan Pair) {
|
|
|
|
|
+ dict := make(map[string]int)
|
|
|
|
|
+ for name := range filename {
|
|
|
|
|
+ data, _ := ioutil.ReadFile(name)
|
|
|
|
|
+ word := ""
|
|
|
|
|
+ for i := range data {
|
|
|
|
|
+ if data[i] >= 'A' && data[i] <= 'Z' {
|
|
|
|
|
+ data[i] += 'a' - 'A'
|
|
|
|
|
+ }
|
|
|
|
|
+ if data[i] >= 'a' && data[i] <= 'z' || word != "" && data[i] == '\'' {
|
|
|
|
|
+ word = strings.Join([]string{word, string(data[i])}, "")
|
|
|
|
|
+ } else if word != "" {
|
|
|
|
|
+ dict[word]++
|
|
|
|
|
+ word = ""
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if word != "" {
|
|
|
|
|
+ dict[word]++
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ for word, cnt := range dict {
|
|
|
|
|
+ shuffle <- Pair{word, cnt}
|
|
|
|
|
+ }
|
|
|
|
|
+ shuffle <- Pair{"", 0}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func ReduceWorker(wordlist chan AggPair, output chan Pair) {
|
|
|
|
|
+ for aggPair := range wordlist {
|
|
|
|
|
+ sum := 0
|
|
|
|
|
+ for _, value := range aggPair.list {
|
|
|
|
|
+ sum += value
|
|
|
|
|
+ }
|
|
|
|
|
+ output <- Pair{aggPair.key, sum}
|
|
|
|
|
+ }
|
|
|
|
|
+ output <- Pair{"", 0}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func main() {
|
|
|
|
|
+ const path = "../testdata"
|
|
|
|
|
+ workerCnt := 0
|
|
|
|
|
+
|
|
|
|
|
+ dataDir, err := ioutil.ReadDir(path)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ fmt.Println("Directory not found!")
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ filename := make(chan string, 1000)
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ for _, file := range dataDir {
|
|
|
|
|
+ filename <- filepath.Join(path, file.Name())
|
|
|
|
|
+ }
|
|
|
|
|
+ close(filename)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ shuffle := make(chan Pair, 1000)
|
|
|
|
|
+ for i := 0; i < 20; i++ {
|
|
|
|
|
+ workerCnt++
|
|
|
|
|
+ go MapWorker(filename, shuffle)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ dict := make(map[string][]int)
|
|
|
|
|
+ for pair := range shuffle {
|
|
|
|
|
+ if pair.key == "" {
|
|
|
|
|
+ workerCnt--
|
|
|
|
|
+ if workerCnt == 0 {
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ dict[pair.key] = append(dict[pair.key], pair.value)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ wordlist := make(chan AggPair, 1000)
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ for word, list := range dict {
|
|
|
|
|
+ wordlist <- AggPair{word, list}
|
|
|
|
|
+ }
|
|
|
|
|
+ close(wordlist)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ output := make(chan Pair, 1000)
|
|
|
|
|
+ for i := 0; i < 1000; i++ {
|
|
|
|
|
+ workerCnt++
|
|
|
|
|
+ go ReduceWorker(wordlist, output)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ answer := make(PairSlice, 0)
|
|
|
|
|
+ for pair := range output {
|
|
|
|
|
+ if pair.key == "" {
|
|
|
|
|
+ workerCnt--
|
|
|
|
|
+ if workerCnt == 0 {
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ answer = append(answer, pair)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ sort.Sort(answer)
|
|
|
|
|
+ for _, pair := range answer {
|
|
|
|
|
+ fmt.Println(pair.key, pair.value)
|
|
|
|
|
+ }
|
|
|
|
|
+}
|