2016-06-30 8 views
11

Próbuję posortować RDD według wartości, a jeśli wiele wartości jest równych, potrzebuję tych wartości przez klucz leksykograficzny.Sortowanie JavaPairRDD najpierw według wartości, a następnie według klucza

Kod:

JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() { 

    @Override 
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < String, Long > (t._1, t._2.count); 
    } 
}); 

co zrobiłem do tej pory jest to, używając takeOrdered i zapewniając CustomComperator, ale ponieważ takeOrdered nie może obsługiwać duże ilości danych, gdy uruchomienie kodu utrzymuje wyjściu (zjada dużo pamięci, że system operacyjny nie może obsłużyć):

List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() { 

    @Override 
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < String, Long > (t._1, t._2.count); 
    } 
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP); 

Comperator:

static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable { 
     private static final long serialVersionUID = 1L; 

     private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator(); 

     @Override 
     public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) { 
      if (o1._2.compareTo(o2._2) == 0) { 
       return o1._1.compareTo(o2._1); 
      } 
      return -o1._2.compareTo(o2._2); 
     } 
} 

BŁĄD:

16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s 

Jak można sortować tej RDD? W jaki sposób podążysz za wartością TopKMovies, aw przypadku tekstów równościowych leksykograficznie?

Dzięki.

+0

można zapewnić ślad stosu (Jeśli istnieje jakikolwiek?). Ponieważ wspomniałeś, że może to być problem z pamięcią, ale komunikat o błędzie nie pozwala zobaczyć, co dokładnie się stało. – Serhiy

+0

@Serhiy Przypuszczam, że jest to problem z pamięcią, ponieważ operacja takeOrdered zajmuje dużo czasu, ponieważ obsługuje dużą ilość danych w trybie Distributed, otrzymałem kod Exit: 137 i Exit code: 1. podejście do sortowania w inny sposób zdecydowanie rozwiąże problem. –

+0

Czy próbowałeś ponownie przeprowadzić podział na partycje danych? Po mapowaniu na parę można ponownie podzielić partycję zaraz potem. – Serhiy

Odpowiedz

3

rozwiązać problemu przy użyciu sortByKey z komparatora & partycje, po Maping się <String, Long> PairRDD do < Tuple2<String,Long> , Long> PairRDD

JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long >() { 

    @Override 
    public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count); 
    } 
}).sortByKey(new TupleMapLongComparator(), true, 100); 


JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() { 

    @Override 
    public Tuple2<String, Long> call(
      Tuple2<Tuple2<String, Long>, Long> t) throws Exception { 
     return new Tuple2 < String, Long > (t._1._1, t._1._2); 
    } 

}); 

komparatora:

private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable { 
    @Override 
    public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) { 

     if (tuple1._2.compareTo(tuple2._2) == 0) { 
      return tuple1._1.compareTo(tuple2._1); 
     } 
     return -tuple1._2.compareTo(tuple2._2); 
    } 
}