6

oświadczenie ProblemKolejność wyjścia nie jest w porządku danego wejścia

scalić dwa zbioru danych, które mają różne nazwy kolumn w Apache iskry po funkcją isin() kolejność uległa zmianie w zbiorze danych.

Nawet próbowałem z sort, orderby, ale nie działał.

dane wejściowe 1: dane2

RowFactory.create("405-048011-62815", "CRC Industries"), 
RowFactory.create("630-0746","Dixon value"), 
RowFactory.create("4444-444","3M INdustries"), 
RowFactory.create("555-55","Dixon coupling valve") 

Wejście:

RowFactory.create("222-2222-5555", "Tata"), 
RowFactory.create("7777-88886","WestSide"), 
RowFactory.create("22222-22224","Reliance"), 
RowFactory.create("33333-3333","V industries") 


List<Row> data = Arrays.asList(
RowFactory.create("405-048011-62815", "CRC Industries"), 
RowFactory.create("630-0746","Dixon value"), 
RowFactory.create("4444-444","3M INdustries"), 
RowFactory.create("555-55","Dixon coupling valve")); 

StructType schema = new StructType(new StructField[] { 
new StructField("label1", DataTypes.StringType, false,Metadata.empty()), 
new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) }); 

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 

List<String> listStrings = new ArrayList<String>(); 
listStrings.add("405-048011-62815"); 
listStrings.add("630-0746"); 
listStrings.add("4444-444"); 
listStrings.add("555-55"); 

Dataset<Row> matchFound1 = sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new))); 
matchFound1.show(); 


listStrings.clear(); 
listStrings.add("222-2222-5555"); 
listStrings.add("7777-88886"); 
listStrings.add("22222-22224"); 
listStrings.add("33333-3333"); 
StringIndexer indexer = new StringIndexer() 
    .setInputCol("label1") 
    .setOutputCol("label1Index1"); 
Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1); 
Dataset1.show(); 


List<Row> data2 = Arrays.asList(
    RowFactory.create("222-2222-5555", "Tata"), 
    RowFactory.create("7777-88886","WestSide"), 
    RowFactory.create("22222-22224","Reliance"), 
    RowFactory.create("33333-3333","V industries")); 
StructType schema2 = new StructType(new StructField[] { 
new StructField("label2", DataTypes.StringType, false,Metadata.empty()), 
new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) }); 

Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

Dataset<Row> matchFound2 = sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new))); 
matchFound2.show(); 

StringIndexer indexer1 = new StringIndexer() 
    .setInputCol("label2") 
    .setOutputCol("label2Index1"); 
Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2); 
Dataset2.show(); 

Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index1").equalTo(Dataset2.col("label2Index1"))).drop(Dataset1.col("label1Index1")).drop(Dataset2.col("label2Index1")); 
Finalresult.show(); 

Rzeczywista wyjściowa:

+----------------+--------------------+-------------+------------+ 
    |   label1|   sentence1|  label2| sentence2| 
    +----------------+--------------------+-------------+------------+ 
    |405-048011-62815|  CRC Industries| 33333-3333|V industries| 
    |  630-0746|   Dixon value|222-2222-5555|  Tata| 
    |  4444-444|  3M INdustries| 7777-88886| WestSide| 
    |   555-55|Dixon coupling valve| 22222-22224| Reliance| 
    +----------------+--------------------+-------------+------------+ 

oczekiwany wynik:

+----------------+--------------------+-------------+------------+ 
    |   label1|   sentence1|  label2| sentence2| 
    +----------------+--------------------+-------------+------------+ 
    |405-048011-62815|  CRC Industries|222-2222-5555|V industries| 
    |  630-0746|   Dixon value| 7777-88886 |  Tata| 
    |  4444-444|  3M INdustries| 22222-22224| WestSide| 
    |   555-55|Dixon coupling valve| 33333-3333 | Reliance| 
    +----------------+--------------------+-------------+------------+ 

Odpowiedz

7

Zamiast robić ciąg Indexer można dodać uporczywy kolumnę z unikalnych numerów sekwencyjnych za pomocą monotonically_increasing_id() i odtworzyć DataFrame jak poniżej:

Dataset<Row> Test2=Dataset2.withColumn("rowId2", monotonically_increasing_id()) ; 
Dataset<Row> Test1=Dataset1.withColumn("rowId1", monotonically_increasing_id()) ; 

następnie dołączyć obie rekordów:

Dataset<Row> Finalresult = Test1.join(Test2 , Test1.col("rowId1").equalTo(Test2.col("rowId2")));