main.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package main
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "path/filepath"
  6. "sort"
  7. "strings"
  8. )
  9. type Pair struct {
  10. key string
  11. value int
  12. }
  13. type AggPair struct {
  14. key string
  15. list []int
  16. }
  17. type PairSlice []Pair
  18. func (s PairSlice) Len() int {
  19. return len(s)
  20. }
  21. func (s PairSlice) Swap(i, j int) {
  22. s[i], s[j] = s[j], s[i]
  23. }
  24. func (s PairSlice) Less(i, j int) bool {
  25. if s[i].value == s[j].value {
  26. return s[i].key < s[j].key
  27. } else {
  28. return s[i].value > s[j].value
  29. }
  30. }
  31. func MapWorker(filename chan string, shuffle chan Pair) {
  32. dict := make(map[string]int)
  33. for name := range filename {
  34. data, _ := ioutil.ReadFile(name)
  35. word := ""
  36. for i := range data {
  37. if data[i] >= 'A' && data[i] <= 'Z' {
  38. data[i] += 'a' - 'A'
  39. }
  40. if data[i] >= 'a' && data[i] <= 'z' || word != "" && data[i] == '\'' {
  41. word = strings.Join([]string{word, string(data[i])}, "")
  42. } else if word != "" {
  43. dict[word]++
  44. word = ""
  45. }
  46. }
  47. if word != "" {
  48. dict[word]++
  49. }
  50. }
  51. for word, cnt := range dict {
  52. shuffle <- Pair{word, cnt}
  53. }
  54. shuffle <- Pair{"", 0}
  55. }
  56. func ReduceWorker(wordlist chan AggPair, output chan Pair) {
  57. for aggPair := range wordlist {
  58. sum := 0
  59. for _, value := range aggPair.list {
  60. sum += value
  61. }
  62. output <- Pair{aggPair.key, sum}
  63. }
  64. output <- Pair{"", 0}
  65. }
  66. func main() {
  67. const path = "../testdata"
  68. workerCnt := 0
  69. dataDir, err := ioutil.ReadDir(path)
  70. if err != nil {
  71. fmt.Println("Directory not found!")
  72. return
  73. }
  74. filename := make(chan string, 1000)
  75. go func() {
  76. for _, file := range dataDir {
  77. filename <- filepath.Join(path, file.Name())
  78. }
  79. close(filename)
  80. }()
  81. shuffle := make(chan Pair, 1000)
  82. for i := 0; i < 20; i++ {
  83. workerCnt++
  84. go MapWorker(filename, shuffle)
  85. }
  86. dict := make(map[string][]int)
  87. for pair := range shuffle {
  88. if pair.key == "" {
  89. workerCnt--
  90. if workerCnt == 0 {
  91. break
  92. }
  93. } else {
  94. dict[pair.key] = append(dict[pair.key], pair.value)
  95. }
  96. }
  97. wordlist := make(chan AggPair, 1000)
  98. go func() {
  99. for word, list := range dict {
  100. wordlist <- AggPair{word, list}
  101. }
  102. close(wordlist)
  103. }()
  104. output := make(chan Pair, 1000)
  105. for i := 0; i < 1000; i++ {
  106. workerCnt++
  107. go ReduceWorker(wordlist, output)
  108. }
  109. answer := make(PairSlice, 0)
  110. for pair := range output {
  111. if pair.key == "" {
  112. workerCnt--
  113. if workerCnt == 0 {
  114. break
  115. }
  116. } else {
  117. answer = append(answer, pair)
  118. }
  119. }
  120. sort.Sort(answer)
  121. for _, pair := range answer {
  122. fmt.Println(pair.key, pair.value)
  123. }
  124. }