ההבדל בין reduceByKey ו- groupByKey

Difference Between Reducebykey



בניצוץ, אנו יודעים שהכל נעשה על פי RDD. בשימוש, ל- RDD יש פורמט מאוד מיוחד ושימושי מאוד - זוג RDD, כלומר, כל שורה של RDD היא בצורה של (מפתח, ערך). פורמט זה דומה מאוד לסוג המילון של פייתון, מה שמקל על ביצוע עיבוד למפתח.

עבור הצורה המיוחדת של RDD זוג, פעולות נוחות רבות מוגדרות ניצוץ. כיום אנו מציגים בעיקר את reducByKey ו- groupByKey, כי נסביר בהמשך. כיצד ליישם את פונקציית group_concat ב- SQL בניצוץ? 》 נעשה שימוש בשתי פעולות אלה.



ראשית, בדוק כיצד האתר הרשמי הניצוץ [1] מסביר:



reduceByKey (func, numPartitions = אין)



למזג את הערכים עבור כל מקש באמצעות פונקציית הפחתה אסוציאטיבית. פעולה זו תבצע גם את המיזוג באופן מקומי על כל מיפוי לפני שתשלח תוצאות למפחית, בדומה ל'משלב 'ב- MapReduce. הפלט יתחלק עם מחיצות חשיש עם מחיצות numPartitions, או ברמת ברירת המחדל של מקבילות אם לא צוין מספר numPartitions.

כלומר, reduceByKey משמש לביצוע פעולות מיזוג על מספר ערכים התואמים לכל מפתח. הדבר החשוב ביותר הוא שהוא יכול לבצע פעולת מיזוג באופן מקומי, וניתן להתאים את פעולת המיזוג לפי פונקציה.

groupByKey (numPartitions = אין)



קיבץ את הערכים עבור כל מקש ב- RDD לרצף יחיד. מחיצות Hash את ה- RDD שנוצר עם מחיצות numPartitions. הערה: אם אתה מקבץ על מנת לבצע צבירה (כגון סכום או ממוצע) על כל מקש, שימוש ב- reducByKey או aggregateByKey יספק ביצועים טובים בהרבה.

כלומר, groupByKey פועל גם על כל מקש, אך מייצר רק רצף אחד. צריך לשים לב במיוחד למילים ב'הערה ', זה אומר לנו: אם אתה צריך לבצע את פעולת הצבירה ברצף (שים לב שה- groupByKey עצמו לא יכול להתאים אישית את פונקציית הפעולה), עדיף לבחור בהפחתת ByKey / aggregateByKey. הסיבה לכך היא כי groupByKey לא יכול להתאים אישית את הפונקציה. עלינו ליצור RDD עם groupByKey תחילה, ואז נוכל להשתמש בפונקציה זו כדי לבצע פעולת פונקציה מותאמת אישית ב- RDD זה.

על מנת להבין טוב יותר את הפסקה שלעיל, אנו משתמשים בשתי דרכים שונות לחישוב מספר המילים [2]:

[java] נוף רגיל עותק

  1. מילים ואל = מערך ('אחד', 'שניים', 'שניים', 'שלוש', 'שלוש', 'שלוש')
  2. val wordPairsRDD = sc.parallelize (מילים) .מפה (word => (מילה, 1))
  3. val wordCountsWithReduce = wordPairsRDD.reduceByKey (_ + _)
  4. val wordCountsWithGroup = wordPairsRDD.groupByKey (). מפה (t => (t._1, t._2.sum))

WordCountsWithReduce ו- wordCountsWithGroup שהושגו לעיל זהים לחלוטין, אך הפעולות הפנימיות שלהם שונות.

(1) כאשר נעשה שימוש ב- reducByKeyt, Spark יכול לשלב את הנתונים שיופלטו עם מפתח משותף לפני העברת נתונים בכל מחיצה. השתמש באיור למטה כדי להבין מה קורה ב- reducByKey. שימו לב כיצד משלב אותו מפתח באותה מכונה לפני העברת זוג הנתונים (פונקציית lamdba ב- reduceByKey). לאחר מכן נקראת פונקציית lamdba בכל אזור כדי להפחית את כל הערכים לתוצאה סופית. התהליך כולו הוא כדלקמן:

(2) בעת שימוש ב- groupByKey, מכיוון שהוא אינו מקבל את הפונקציה, הניצוץ יכול רק להזיז את כל זוגות ערך המפתח תחילה. התוצאה של זה היא שהתקורה בין צמתים באשכול גדולה, וכתוצאה מכך עיכוב שידור. . התהליך כולו הוא כדלקמן:

לכן, reduceByKey טוב יותר מאשר groupByKey בעת ביצוע חישובים מורכבים על נתונים גדולים.

כמו כן, אם מדובר רק בעיבוד קבוצתי, הפונקציה הבאה צריכה להיות עדיפה על פני groupByKey:
(1), combineByKey משלב נתונים, אך סוג הנתונים אחרי השילוב שונה מסוג זמן הקלט.
(2), foldByKey משלב את כל הערכים של כל מקש, המשמש בפונקציות מדורגות ו'ערך אפס '.

לבסוף, ערוך מבוא לפונקציה ב- reducByKey:

אם אתה משתמש בניצוץ בפייתון, אז ספרייה שימושית מאוד: אופרטור [3], שניתן להשתמש בו כדי לכלול: פונקציית השוואת גודל, פונקציית פעולה לוגית, פונקציית פעולה מתמטית, פונקציית פעולת רצף וכן הלאה. ניתן לקרוא לפונקציות אלה ישירות מ'מייבא מפעיל * ', ולהעביר את שם הפונקציה ישירות כפרמטר להפחתת ByKey. כדלהלן:

[פִּיתוֹן] נוף רגיל עותק

  1. מהיבוא מפעיל להוסיף
  2. rdd = sc.parallelize ([('a', 1), ('b', 1), ('a', 1)])
  3. מיין (rdd.reduceByKey (הוסף) .collect ())
  4. [('a', 2), ('b', 1)]

להלן הסבר מפורט יותר על קוד המקור הנוסף.

העברה מ: https://blog.csdn.net/ZMC921/article/details/75098903

ראשית, כולם צריכים לעבור את הדשדוש, groupByKey לא ימזג את הדשדוש המקורי בין דשדוש השיטות. ה- reducByKey ימוזג לפני הדשדוש, מה שמקטין את העברת ה- IO של הדשדוש, כך שהיעילות גבוהה יותר.

מקרה:

[מישור] נוף רגיל עותק

  1. אובייקט GroupyKeyAndReduceByKeyDemo {
  2. def main (טענות: Array [String]): Unit = {
  3. Logger.getLogger ('org'). SetLevel (Level.WARN)
  4. val config = חדש SparkConf (). setAppName ('GroupyKeyAndReduceByKeyDemo'). setMaster ('local')
  5. val sc = SparkContext חדש (config)
  6. val arr = Array ('val config', 'val arr')
  7. val socketDS = sc.parallelize (arr) .flatMap (_. split ('')). מפה ((_, 1))
  8. // ההבדל בין groupByKey ל- reduceByKey:
  9. / / כולם עוברים את הדשדוש, groupByKey לא ימזג את הדשדוש המקורי בין השיטות דשדוש,
  10. // reduceByKey יתמזג לפני הדשדוש, מה שמקטין את העברת ה- io של הדשדוש, כך שהיעילות גבוהה יותר.
  11. socketDS.groupByKey (). מפה (tuple => (tuple._1, tuple._2.sum)). foreach (x => {
  12. println (x._1 + '+ x._2)
  13. })
  14. println ('----------------------')
  15. socketDS.reduceByKey (_ + _). foreach (x => {
  16. println (x._1 + '+ x._2)
  17. })
  18. sc.stop ()
  19. }
  20. }

שנית, ישנם שלושה סוגים של groupByKey

הצג את group groupByKey () מיישם groupByKey (defaultPartitioner (עצמי))

[java] נוף רגיל עותק

  1. / **
  2. * קיבצו את הערכים עבור כל מקש ב- RDD לרצף יחיד. מחיצות האש את
  3. * RDD וכתוצאה מכך עם רמת המחיצה / מקבילות הקיימת. סידור האלמנטים
  4. * בתוך כל קבוצה לא מובטחת, ואף עשויה להיות שונה בכל פעם שה- RDD שנוצר
  5. * העריך.
  6. *
  7. * @ הערה פעולה זו עשויה להיות יקרה מאוד. אם אתה מקבץ על מנת לבצע
  8. * צבירה (כגון סכום או ממוצע) על כל מקש, באמצעות 'PairRDDFunctions.aggregateByKey'
  9. * או 'PairRDDFunctions.reduceByKey' יספקו ביצועים טובים בהרבה.
  10. * /
  11. def groupByKey (): RDD [(K, Iterable [V])] = self.withScope {
  12. groupByKey (defaultPartitioner (עצמי))
  13. }

צפה במקור groupByKey (numPartitions: Int) מיישם groupByKey (HashPartitioner חדש (numPartitions))

[java] נוף רגיל עותק

  1. / **
  2. * קיבצו את הערכים עבור כל מקש ב- RDD לרצף יחיד. מחיצות האש את
  3. * RDD וכתוצאה מכך למחיצות 'numPartitions'. סידור האלמנטים בפנים
  4. * כל קבוצה אינה מובטחת, ואף עשויה להיות שונה בכל פעם שמעריכים את ה- RDD המתקבל.
  5. *
  6. * @ הערה פעולה זו עשויה להיות יקרה מאוד. אם אתה מקבץ על מנת לבצע
  7. * צבירה (כגון סכום או ממוצע) על כל מקש, באמצעות 'PairRDDFunctions.aggregateByKey'
  8. * או 'PairRDDFunctions.reduceByKey' יספקו ביצועים טובים בהרבה.
  9. *
  10. * הערה @ כפי שמוטמע כרגע, groupByKey חייב להיות מסוגל להחזיק את כל זוגות ערך המפתח לכל אחד מהם
  11. * מקש בזיכרון. אם למפתח יש יותר מדי ערכים, הוא יכול לגרום ל- 'OutOfMemoryError'.
  12. * /
  13. def groupByKey (numPartitions: Int): RDD [(K, Iterable [V])] = self.withScope {
  14. groupByKey (HashPartitioner חדש (numPartitions))
  15. }

למעשה, השניים לעיל מיושמים groupByKey (מחיצה: מחיצה)

[java] נוף רגיל עותק

  1. / **
  2. * קיבצו את הערכים עבור כל מקש ב- RDD לרצף יחיד. מאפשר לשלוט ב-
  3. * מחיצה של צמד ערכי המפתח RDD וכתוצאה מכך על ידי העברת מחיצה.
  4. * סידור האלמנטים בתוך כל קבוצה אינו מובטח, ואף עשוי להיות שונה
  5. * בכל פעם שמעריכים את ה- RDD שנוצר.
  6. *
  7. * @ הערה פעולה זו עשויה להיות יקרה מאוד. אם אתה מקבץ על מנת לבצע
  8. * צבירה (כגון סכום או ממוצע) על כל מקש, באמצעות 'PairRDDFunctions.aggregateByKey'
  9. * או 'PairRDDFunctions.reduceByKey' יספקו ביצועים טובים בהרבה.
  10. *
  11. * הערה @ כפי שמוטמע כרגע, groupByKey חייב להיות מסוגל להחזיק את כל זוגות ערך המפתח לכל אחד מהם
  12. * מקש בזיכרון. אם למפתח יש יותר מדי ערכים, הוא יכול לגרום ל- 'OutOfMemoryError'.
  13. * /
  14. def groupByKey (מחיצה: מחיצה): RDD [(K, Iterable [V])] = self.withScope {
  15. // groupByKey לא צריך להשתמש בשילוב בצד המפה מכיוון שהשילוב בצד המפה אינו עושה זאת
  16. // צמצם את כמות הנתונים הנדששים ודורש להכניס את כל נתוני צד המפה
  17. // לטבלת חשיש, מה שמוביל לאובייקטים נוספים בסוג הישן.
  18. val createCombiner = (v: V) => CompactBuffer (v)
  19. val mergeValue = (buf: CompactBuffer [V], v: V) => buf + = v
  20. val mergeCombiners = (c1: CompactBuffer [V], c2: CompactBuffer [V]) => c1 ++ = c2
  21. val bufs = combineByKeyWithClassTag [CompactBuffer [V]] (
  22. createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  23. bufs.asInstanceOf [RDD [(K, Iterable [V])]]
  24. }

GroupByKey (partitioner: Partitioner) מיישם combineByKeyWithClassTag, כך שניתן לומר כי groupByKey הוא למעשה היישום של combineByKeyWithClassTag, אך היישום שונה.

שלישית, ישנן שלוש דרכים להציג שוב את reducByKey.


[java] נוף רגיל עותק

  1. / **
  2. * מיזג את הערכים עבור כל מקש באמצעות פונקציית צמצום אסוציאטיבית וקומוטטיבית. זה יהיה
  3. * בצע גם את המיזוג באופן מקומי על כל מיפוי לפני שתשלח תוצאות למפחית, באופן דומה
  4. * ל'משלב 'ב- MapReduce. הפלט יתחלק עם המחיצה הקיימת /
  5. * רמת מקבילות.
  6. * /
  7. def reduceByKey (func: (V, V) => V): RDD [(K, V)] = self.withScope {
  8. reduceByKey (defaultPartitioner (עצמי), func)
  9. }

[java] נוף רגיל עותק

  1. / **
  2. * מיזג את הערכים עבור כל מקש באמצעות פונקציית צמצום אסוציאטיבית וקומוטטיבית. זה יהיה
  3. * בצע גם את המיזוג באופן מקומי על כל מיפוי לפני שתשלח תוצאות למפחית, באופן דומה
  4. * ל'משלב 'ב- MapReduce. הפלט יהיה מחולק hash עם מחיצות numPartitions.
  5. * /
  6. def reduceByKey (func: (V, V) => V, numPartitions: Int): RDD [(K, V)] = self.withScope {
  7. reduceByKey (HashPartitioner חדש (numPartitions), פונקציה)
  8. }

[java] נוף רגיל עותק

  1. / **
  2. * מיזג את הערכים עבור כל מקש באמצעות פונקציית צמצום אסוציאטיבית וקומוטטיבית. זה יהיה
  3. * בצע גם את המיזוג באופן מקומי על כל מיפוי לפני שתשלח תוצאות למפחית, באופן דומה
  4. * ל'משלב 'ב- MapReduce.
  5. * /
  6. def reduceByKey (מחיצה: מחיצה, פונקציה: (V, V) => V): RDD [(K, V)] = self.withScope {
  7. combineByKeyWithClassTag [V] ((v: V) => v, func, func, partitioner)
  8. }

לא קשה לברר זאת על ידי התבוננות בשלושת מקשי ה- reducByKeys הללו. השניים הראשונים הם היישום האחרון. האחרון הוא הטמעת combineByKeyWithClassTag.

### groupByKey מיושם כך

combineByKeyWithClassTag [CompactBuffer [V]] (createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

### reduceByKey מיושם כך

combineByKeyWithClassTag [V] ((v: V) => v, func, func, partitioner)

בהשוואה לאמור לעיל, groupByKey מגדיר את mapSideCombine = false ולא מתמזג בצד המפה, כלומר הוא אינו מתמזג לפני דשדוש. ה- ReduceByKey אינו מוגדר.

האם reducByKey מוזג כברירת מחדל? ? ? ?

רביעית, הבא, בואו נסתכל מקרוב על combineByKeyWithClassTag

[java] נוף רגיל עותק

  1. / **
  2. * :: ניסיוני ::
  3. * פונקציה כללית לשילוב האלמנטים עבור כל מקש באמצעות סט צבירה מותאם אישית
  4. * פונקציות. הופך RDD [(K, V)] לתוצאה מסוג RDD [(K, C)], עבור 'סוג משולב' C
  5. *
  6. * המשתמשים מספקים שלוש פונקציות:
  7. *
  8. * - 'createCombiner', שהופך V ל- C (למשל, יוצר רשימה של רכיב אחד)
  9. * - 'mergeValue', למזג V ל- C (למשל, להוסיף אותו בסוף הרשימה)
  10. * - 'mergeCombiners', כדי לשלב בין שני C 'לאחד.
  11. *
  12. * בנוסף, משתמשים יכולים לשלוט על מחיצת הפלט RDD, והאם לבצע
  13. * צבירה בצד המפה (אם ממפה יכול לייצר מספר פריטים עם אותו מפתח).
  14. *
  15. * הערה V ו- C @ יכולים להיות שונים - למשל, אפשר לקבץ RDD מסוג
  16. * (Int, Int) לתוך RDD מסוג (Int, Seq [Int]).
  17. * /
  18. @נִסיוֹנִי
  19. def combineByKeyWithClassTag [C] (
  20. createCombiner: V => C,
  21. mergeValue: (C, V) => C,
  22. mergeCombiners: (C, C) => C,
  23. מחיצות: מחיצות,
  24. mapSideCombine: בוליאני = נכון,
  25. serializer: Serializer = null) (ct implicit: ClassTag [C]): RDD [(K, C)] = self.withScope {
  26. דורש (mergeCombiners! = null, יש להגדיר 'mergeCombiners') // נדרש החל מ- Spark 0.9.0
  27. אם (keyClass.isArray) {
  28. אם (mapSideCombine) {
  29. זרק SparkException חדש ('לא יכול להשתמש בצד המפה בשילוב עם מקשי מערך.')
  30. }
  31. אם (partitioner.isInstanceOf [HashPartitioner]) {
  32. זרק SparkException חדש ('HashPartitioner לא יכול לחיצה על מפתחות מערך.')
  33. }
  34. }
  35. צובר Val = צבר חדש [K, V, C] (
  36. self.context.clean (createCombiner),
  37. self.context.clean (mergeValue),
  38. self.context.clean (mergeCombiners))
  39. אם (מחיצה עצמית == חלק (מחיצה)) {
  40. self.mapPartitions (iter => {
  41. val context = TaskContext.get ()
  42. InterruptibleIterator חדש (הקשר, aggregator.combineValuesByKey (איטר, הקשר))
  43. }, preservesPartitioning = true)
  44. } אחר {
  45. ShuffledRDD חדש [K, V, C] (עצמי, מחיצות)
  46. .setSerializer (סידורי)
  47. .setAggregator (צובר)
  48. .setMapSideCombine (mapSideCombine)
  49. }
  50. }

על ידי התבוננות ב- combineByKeyWithClassTag, נמצא ש- reducByKey מוזג כברירת מחדל בצד המפה, כלומר הוא מוזג לפני דשדוש. אם חלק מהנתונים מוזגו, כתיבת הצפה בדחיפה מקטינה את ה- IO של הדיסק, כך ש- ReduceByKey יהיה מהיר יותר.