2015-10-22 5 views
11

Mam Spark 1.5.0 DataFrame z mieszanką null i puste ciągi w tej samej kolumnie. Chcę przekonwertować wszystkie puste ciągi we wszystkich kolumnach na null (None, w języku Python). DataFrame może mieć setki kolumn, więc staram się unikać zakodowanych na sztywno manipulacji dla każdej kolumny.Zamień puste ciągi na wartości Brak/puste w DataFrame

Zobacz moją próbę poniżej, co powoduje błąd.

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

## Create a test DataFrame 
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')]) 
testDF.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## | | 2| 
## |null|null| 
## +----+----+ 

## Try to replace an empty string with None/null 
testDF.replace('', None).show() 
## ValueError: value should be a float, int, long, string, list, or tuple 

## A string value of null (obviously) doesn't work... 
testDF.replace('', 'null').na.drop(subset='col1').show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## +----+----+ 
+0

@palsch, Nie, nie zwraca listy. Zwraca obiekt DataFrame. Zaktualizowałem pytanie linkiem do dokumentacji Sparka. – dnlbrky

+2

@palsch to nie jest ogólne pytanie Pythona! Spark DataFrames to rozproszona struktura danych wykorzystywana ogólnie w celu umożliwienia analizy dużych zbiorów danych. Więc twoje rozwiązanie nie pasuje. – eliasah

+1

@eliasah Prawdę powiedziane Pythonic 'lambda x: Brak, jeśli nie x else x' owinięty' udf' działałby dobrze ... – zero323

Odpowiedz

15

Jest to tak proste, jak to:

from pyspark.sql.functions import col, when 

def blank_as_null(x): 
    return when(col(x) != "", col(x)).otherwise(None) 

dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1")) 

dfWithEmptyReplaced.show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## |null| 2| 
## |null|null| 
## +----+----+ 

dfWithEmptyReplaced.na.drop().show() 
## +----+----+ 
## |col1|col2| 
## +----+----+ 
## | foo| 1| 
## +----+----+ 

Jeśli chcesz wypełnić wiele kolumn można na przykład ograniczyć:

to_convert = set([...]) # Some set of columns 

reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF) 

lub skorzystać ze zrozumieniem:

exprs = [ 
    blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns] 

testDF.select(*exprs) 

Jeśli chcesz specjalnie pracować oceń na polach tekstowych, sprawdź the answer przez robin-loxley.

+0

Dzięki @ zero323. Czy twoja odpowiedź może zostać rozszerzona, aby obsłużyć wiele kolumn automatycznie i wydajnie? Być może lista wszystkich nazw kolumn, wygenerować podobny kod jako odpowiedź dla każdej kolumny, a następnie ocenić kod? – dnlbrky

+0

Nie widzę żadnego powodu, dla którego nie możesz. DataFrame są leniwie oceniane, a reszta to zwykły Python. W edycji znajdziesz kilka opcji. – zero323

+0

Przyjmuję tę odpowiedź, ale czy mógłbyś najpierw dodać bit z @RobinLoxley? Lub, jeśli nie masz nic przeciwko, mogę edytować twoją odpowiedź. – dnlbrky

8

Moje rozwiązanie jest znacznie lepsze niż wszystkie rozwiązania I'v widziałem do tej pory, które radzą sobie z tak wielu dziedzinach, jak chcesz, zobacz mało funkcji jak:

// Replace empty Strings with null values 
    private def setEmptyToNull(df: DataFrame): DataFrame = { 
    val exprs = df.schema.map { f => 
     f.dataType match { 
     case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name) 
     case _ => col(f.name) 
     } 
    } 

    df.select(exprs: _*) 
    } 

można łatwo przerobić funkcja powyżej w Pythonie.

dowiedziałem ten trick z @liancheng

6

Wystarczy dodać na górze zero323 tych i odpowiedzi soulmachine użytkownika. Aby przekonwertować wszystkie pola StringType.

from pyspark.sql.types import StringType 
string_fields = [] 
for i, f in enumerate(test_df.schema.fields): 
    if isinstance(f.dataType, StringType): 
     string_fields.append(f.name)