Jestem bardzo nowa w Apache Spark. Chciałbym skupić się na podstawowej specyfikacji Spark API i chcę zrozumieć i napisać kilka programów przy pomocy Sparka API. Napisałem program java za pomocą Apache Spark do implementacji koncepcji Joins.Apache Spark Dołącza do przykładu z Javą
Kiedy używać Left Outer Join - leftOuterJoin() lub prawe sprzężenie zewnętrzne - rightOuterJoin(), obie są dwie metody zwracania JavaPairRDD który zawiera specjalny rodzaj Opcje Google. Ale nie wiem, jak wyodrębnić oryginalne wartości z typu opcjonalnego.
W każdym razie chciałbym wiedzieć, czy mogę użyć tych samych metod łączenia, które zwracają dane w moim własnym formacie. Nie znalazłem żadnego sposobu, aby to zrobić. Znaczenie ma to, że kiedy używam Apache Sparka, nie jestem w stanie dostosować kodu w moim własnym stylu, ponieważ już dano wszystkie predefiniowane rzeczy.
Proszę znaleźć kod poniżej
my 2 sample input datasets
customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter
and
trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit
Oto mój kod Java
**SparkJoins.java:**
public class SparkJoins {
@SuppressWarnings("serial")
public static void main(String[] args) throws FileNotFoundException {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] customerSplit = s.split(",");
return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
}
}).distinct();
JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String s) {
String[] transactionSplit = s.split(",");
return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
}
});
//Default Join operation (Inner join)
JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
System.out.println("Joins function Output: "+joinsOutput.collect());
//Left Outer join operation
JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());
//Right Outer join operation
JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());
sc.close();
}
}
I tu wyjście, które otrzymuję
Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]
LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]
RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]
Używam tego programu na Platforma Windows
Należy przestrzegać powyższej wyjście i mi pomóc w wyodrębnieniu wartości z opcjonalnego typu
góry dzięki
Dlaczego nie używać Scala zamiast tego? – maasg
Hi @maasg, jestem w zasadzie programistą Java. Naprawdę nie znam Scala .. Ale myślę, że Apache Spark jest najbardziej odpowiedni dla programowania Scala niż Java. –
@ShekarPatel Czy możesz zaktualizować swój kod w jaki sposób usunąłeś ten Opcjonalny .. który będzie pomocny dla innych. – Shankar