main.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "sort"
  7. )
  8. /*----- MapReduce frame -----*/
  9. const chanSize = 1000
  10. type Config struct {
  11. path string
  12. mapCnt int
  13. reduceCnt int
  14. mapFunc func([]byte, chan Pair)
  15. combineFunc func(string, []int) Pair
  16. reduceFunc func(string, []int) Pair
  17. }
  18. /* Pair type */
  19. type Pair struct {
  20. key string
  21. value int
  22. }
  23. /* Aggregated Pair type */
  24. type AggPair struct {
  25. key string
  26. list []int
  27. }
  28. /* Pair Slice type */
  29. type PairSlice []Pair
  30. /* Frame-defined mapper */
  31. func Mapper(config Config, filename chan string, shuffle chan Pair) {
  32. output := make(chan Pair, chanSize)
  33. merge := make(map[string][]int)
  34. for name := range filename {
  35. text, _ := os.ReadFile(name) // read file content
  36. go config.mapFunc(text, output)
  37. for pair := range output {
  38. if pair.key == "" { // map finish
  39. break
  40. }
  41. merge[pair.key] = append(merge[pair.key], pair.value) // merge in map stage
  42. }
  43. }
  44. for key, list := range merge {
  45. shuffle <- config.combineFunc(key, list) // combine local result, to shuffle stage
  46. }
  47. shuffle <- Pair{"", 0} // mapper finish
  48. }
  49. /* Frame-defined reducer */
  50. func Reducer(config Config, keylist chan AggPair, collect chan Pair) {
  51. for aggPair := range keylist {
  52. collect <- config.reduceFunc(aggPair.key, aggPair.list) // to final stage
  53. }
  54. collect <- Pair{"", 0} // reducer finish
  55. }
  56. /* MapReduce job */
  57. func Run(config Config) (PairSlice, error) {
  58. workerCnt := 0
  59. dataDir, err := os.ReadDir(config.path) // get directory info
  60. if err != nil {
  61. return nil, fmt.Errorf("Directory not found!")
  62. }
  63. /* Map stage */
  64. filename := make(chan string, chanSize)
  65. go func() {
  66. defer close(filename)
  67. for _, file := range dataDir {
  68. if file.IsDir() {
  69. continue
  70. }
  71. filename <- filepath.Join(config.path, file.Name()) // to map stage
  72. }
  73. }()
  74. shuffle := make(chan Pair, chanSize)
  75. for i := 0; i < config.mapCnt; i++ {
  76. workerCnt++
  77. go Mapper(config, filename, shuffle) // start mapper
  78. }
  79. /* Shuffle stage */
  80. merge := make(map[string][]int)
  81. for pair := range shuffle { // get from map stage
  82. if pair.key == "" {
  83. workerCnt--
  84. if workerCnt == 0 {
  85. break
  86. }
  87. } else {
  88. merge[pair.key] = append(merge[pair.key], pair.value) // merge in shuffle stage
  89. }
  90. }
  91. /* Reduce stage */
  92. keylist := make(chan AggPair, chanSize)
  93. go func() {
  94. defer close(keylist)
  95. for key, list := range merge {
  96. keylist <- AggPair{key, list} // to reduce stage
  97. }
  98. }()
  99. collect := make(chan Pair, chanSize)
  100. for i := 0; i < config.reduceCnt; i++ {
  101. workerCnt++
  102. go Reducer(config, keylist, collect) // start reducer
  103. }
  104. /* Final stage */
  105. output := make(PairSlice, 0)
  106. for pair := range collect { // get from reduce stage
  107. if pair.key == "" {
  108. workerCnt--
  109. if workerCnt == 0 {
  110. break
  111. }
  112. } else {
  113. output = append(output, pair)
  114. }
  115. }
  116. return output, nil
  117. }
  118. /*----- Custom function -----*/
  119. /* Functions used for sort */
  120. func (s PairSlice) Len() int {
  121. return len(s)
  122. }
  123. func (s PairSlice) Swap(i, j int) {
  124. s[i], s[j] = s[j], s[i]
  125. }
  126. func (s PairSlice) Less(i, j int) bool {
  127. if s[i].value == s[j].value {
  128. return s[i].key < s[j].key
  129. }
  130. return s[i].value > s[j].value
  131. }
  132. /* User-defined map function */
  133. func Map(text []byte, output chan Pair) {
  134. word := ""
  135. for i := range text {
  136. if text[i] >= 'A' && text[i] <= 'Z' {
  137. text[i] += 'a' - 'A' // to lower case
  138. }
  139. if text[i] >= 'a' && text[i] <= 'z' || word != "" && text[i] == '\'' {
  140. word += string(text[i]) // extend word
  141. } else if word != "" {
  142. output <- Pair{word, 1}
  143. word = ""
  144. }
  145. }
  146. if word != "" {
  147. output <- Pair{word, 1}
  148. }
  149. output <- Pair{"", 0} // map finish
  150. }
  151. /* User-defined reduce function */
  152. func Reduce(key string, list []int) Pair {
  153. sum := 0
  154. for _, value := range list {
  155. sum += value
  156. }
  157. return Pair{key, sum} // reduce finish
  158. }
  159. func main() {
  160. config := Config{
  161. path: "../testdata", // directory that stores data
  162. mapCnt: 4, // number of mapper
  163. reduceCnt: 4, // number of reducer
  164. mapFunc: Map, // map function
  165. combineFunc: Reduce, // combine function
  166. reduceFunc: Reduce} // reduce function
  167. answer, err := Run(config)
  168. if err != nil {
  169. fmt.Println(err)
  170. return
  171. }
  172. sort.Sort(answer)
  173. for _, pair := range answer {
  174. fmt.Printf("%s\t%d\n", pair.key, pair.value)
  175. }
  176. }