2017-01-16 23 views
7

Czy istnieje odpowiednik Pandas Melt Function w Apache Spark w PySpark lub przynajmniej w Scala?Jak stopić Spark DataFrame?

Do tej pory korzystałem z próbnego zestawu danych w pythonie, a teraz chcę używać Sparka do całego zestawu danych.

Z góry dziękuję.

+0

Sprawdź to: http://chappers.github.io/web%20micro%20log/2016/03/07/implementing-simple-melt-function-for-pyspark/ – MYGz

+0

Przepraszamy za opóźnioną odpowiedź ... Błąd niepowodzenia zadania, nawet dla małego przykładowego zestawu danych (rdd) utworzonego za pomocą rdd = sc.parallelize ([("x", 1,4), ("y", 3,5), ("z", 2 , 6)]) –

Odpowiedz

12

Nie ma wbudowanej funkcji (jeśli pracujesz z obsługą SQL i Hive, możesz używać stack function, ale nie jest ona widoczna w Sparku i nie ma natywnej implementacji), ale jej własna rola jest banalna. Wymagane import:

from pyspark.sql.functions import array, col, explode, lit, struct 
from pyspark.sql import DataFrame 
from typing import Iterable 

Przykład wdrożenia:

def melt(
     df: DataFrame, 
     id_vars: Iterable[str], value_vars: Iterable[str], 
     var_name: str="variable", value_name: str="value") -> DataFrame: 
    """Convert :class:`DataFrame` from wide to long format.""" 

    # Create array<struct<variable: str, value: ...>> 
    _vars_and_vals = array(*(
     struct(lit(c).alias(var_name), col(c).alias(value_name)) 
     for c in value_vars)) 

    # Add to the DataFrame and explode 
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

    cols = id_vars + [ 
      col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] 
    return _tmp.select(*cols) 

A niektóre testy (na podstawie Pandas doctests):

import pandas as pd 

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'}, 
        'B': {0: 1, 1: 3, 2: 5}, 
        'C': {0: 2, 1: 4, 2: 6}}) 

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C']) 
A variable value 
0 a  B  1 
1 b  B  3 
2 c  B  5 
3 a  C  2 
4 b  C  4 
5 c  C  6 
sdf = spark.createDataFrame(pdf) 
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show() 
+---+--------+-----+ 
| A|variable|value| 
+---+--------+-----+ 
| a|  B| 1| 
| a|  C| 2| 
| b|  B| 3| 
| b|  C| 4| 
| c|  B| 5| 
| c|  C| 6| 
+---+--------+-----+ 

Uwaga: Do użytku ze starszymi wersjami Pythona usuń adnotacje typu.

3

Przyszedłem przez to pytanie w moich poszukiwaniach realizacji stopu w Spark dla scala. Publikowanie mojego portu Scala na wypadek, gdyby ktoś również natknął się na to.

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame} 
/** Extends the [[org.apache.spark.sql.DataFrame]] class 
* 
* @param df the data frame to melt 
*/ 
implicit class DataFrameFunctions(df: DataFrame) { 

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format. 
    * 
    * melt is (kind of) the inverse of pivot 
    * melt is currently (02/2017) not implemented in spark 
    * 
    * @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html) 
    * @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark 
    * 
    * @todo method overloading for simple calling 
    * 
    * @param id_vars the columns to preserve 
    * @param value_vars the columns to melt 
    * @param var_name the name for the column holding the melted columns names 
    * @param value_name the name for the column holding the values of the melted columns 
    * 
    */ 

    def melt(
      id_vars: Seq[String], value_vars: Seq[String], 
      var_name: String = "variable", value_name: String = "value") : DataFrame = { 

     // Create array<struct<variable: str, value: ...>> 
     val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*) 

     // Add to the DataFrame and explode 
     val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

     val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }} 

     return _tmp.select(cols: _*) 

    } 
} 

Ponieważ nie jestem tak zaawansowany, biorąc pod uwagę scala, jestem pewien, że jest miejsce na poprawę. Wszelkie komentarze są mile widziane.