11

Chciałbym napisać koder dla typu Row w DataSet, dla operacji na mapie, którą wykonuję. Zasadniczo nie rozumiem, jak pisać enkodery.Enkoder dla typu wiersza Zestaw danych iskier

Poniżej jest przykład działania mapy:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() { 
      @Override 
      public Iterator<String> call(Row row) throws Exception { 

       ArrayList<String> obj = //some map operation 
       return obj.iterator(); 
      } 
     },Encoders.STRING()); 

Rozumiem, że zamiast Encoder ciąg musi być zapisana następująco:

Encoder<Row> encoder = new Encoder<Row>() { 
     @Override 
     public StructType schema() { 
      return join.schema(); 
      //return null; 
     } 

     @Override 
     public ClassTag<Row> clsTag() { 
      return null; 
     } 
    }; 

Jednak ja nie rozumiem clsTag() w enkoderze i próbuję znaleźć działający przykład, który może zdystansować coś podobnego (np. koder dla typu wiersza)

Edytuj - To nie jest kopia zadanego pytania: Encoder error while trying to map dataframe row to updated row jako odpowiedź mówi o używaniu Spark 1.x w Spark 2.x (nie robię tego), również szukam kodera dla rzędu klasy zamiast rozwiązać błąd. W końcu szukałem rozwiązania w Javie, a nie w Scali.

Odpowiedz

9

Odpowiedzią jest użycie RowEncoder i schematu zestawu danych przy użyciu TypeStruct.

Poniżej jest przykład roboczych od operacji flatmap rekordów:

StructType structType = new StructType(); 
    structType = structType.add("id1", DataTypes.LongType, false); 
    structType = structType.add("id2", DataTypes.LongType, false); 

    ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); 

    Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() { 
     @Override 
     public Iterator<Row> call(Row row) throws Exception { 
      // a static map operation to demonstrate 
      List<Object> data = new ArrayList<>(); 
      data.add(1l); 
      data.add(2l); 
      ArrayList<Row> list = new ArrayList<>(); 
      list.add(RowFactory.create(data.toArray())); 
      return list.iterator(); 
     } 
    }, encoder); 
+0

nie powinno to nie w trybie klastra, ponieważ ArrayList nie jest można serializować – user482963