2015-02-05 12 views
8

Jestem bardzo nowa w Apache Spark. Chciałbym skupić się na podstawowej specyfikacji Spark API i chcę zrozumieć i napisać kilka programów przy pomocy Sparka API. Napisałem program java za pomocą Apache Spark do implementacji koncepcji Joins.Apache Spark Dołącza do przykładu z Javą

Kiedy używać Left Outer Join - leftOuterJoin() lub prawe sprzężenie zewnętrzne - rightOuterJoin(), obie są dwie metody zwracania JavaPairRDD który zawiera specjalny rodzaj Opcje Google. Ale nie wiem, jak wyodrębnić oryginalne wartości z typu opcjonalnego.

W każdym razie chciałbym wiedzieć, czy mogę użyć tych samych metod łączenia, które zwracają dane w moim własnym formacie. Nie znalazłem żadnego sposobu, aby to zrobić. Znaczenie ma to, że kiedy używam Apache Sparka, nie jestem w stanie dostosować kodu w moim własnym stylu, ponieważ już dano wszystkie predefiniowane rzeczy.

Proszę znaleźć kod poniżej

my 2 sample input datasets 

customers_data.txt: 
4000001,Kristina,Chung,55,Pilot 
4000002,Paige,Chen,74,Teacher 
4000003,Sherri,Melton,34,Firefighter 

and 

trasaction_data.txt 
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit 
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit 
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash 
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit 
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit 

Oto mój kod Java

**SparkJoins.java:** 

public class SparkJoins { 

    @SuppressWarnings("serial") 
    public static void main(String[] args) throws FileNotFoundException { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local")); 
     JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt"); 
     JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] customerSplit = s.split(","); 
       return new Tuple2<String, String>(customerSplit[0], customerSplit[1]); 
      } 
     }).distinct(); 

     JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt"); 
     JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] transactionSplit = s.split(","); 
       return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]); 
      } 
     }); 

     //Default Join operation (Inner join) 
     JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs); 
     System.out.println("Joins function Output: "+joinsOutput.collect()); 

     //Left Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect()); 

     //Right Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect()); 

     sc.close(); 
    } 
} 

I tu wyjście, które otrzymuję

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))] 

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])] 

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])] 

Używam tego programu na Platforma Windows

Należy przestrzegać powyższej wyjście i mi pomóc w wyodrębnieniu wartości z opcjonalnego typu

góry dzięki

+0

Dlaczego nie używać Scala zamiast tego? – maasg

+0

Hi @maasg, jestem w zasadzie programistą Java. Naprawdę nie znam Scala .. Ale myślę, że Apache Spark jest najbardziej odpowiedni dla programowania Scala niż Java. –

+0

@ShekarPatel Czy możesz zaktualizować swój kod w jaki sposób usunąłeś ten Opcjonalny .. który będzie pomocny dla innych. – Shankar

Odpowiedz

8

Kiedy robisz lewe i prawe sprzężenie zewnętrzne sprzężenie zewnętrzne, może mieć wartości null. dobrze!

Powraca iskra Obiekt opcjonalny. po uzyskaniu tego wyniku możesz odwzorować ten wynik na swój własny format.

Twoja metoda może wykorzystywać metodę Optation do isPresent() do mapowania danych.

Oto przykład:

JavaPairRDD<String,String> firstRDD = .... 
JavaPairRDD<String,String> secondRDD =.... 
// join both rdd using left outerjoin 
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD); 


// mapping of join result 
JavaPairRDD<String, String> mappedRDD = rddWithJoin 
      .mapToPair(tuple -> { 
       if (tuple._2()._2().isPresent()) { 
        //do your operation and return 
        return new Tuple2<String, String>(tuple._1(), tuple._2()._1()); 
       } else { 
        return new Tuple2<String, String>(tuple._1(), "not present"); 
       } 
      }); 
+0

Dziękuję kolego .. Działa dobrze .. –

+0

@ sms_1190 jak zmapować ten wynik do naszego własnego formatu? również mam ten sam problem. – Shankar

+0

@ Shankar: Dodałem przykład w powyższej odpowiedzi. mappedRDD jest twoim własnym formatem. –