Załóżmy, że mam DataFrame (którą odczytałem z CSV na HDFS) i chcę wytrenować na nim niektóre algorytmy poprzez MLlib. Jak przekonwertować wiersze na LabeledPoints lub w inny sposób wykorzystać MLlib na tym zestawie danych?Używanie DataFrame z MLlib
Odpowiedz
Zakładając, że używasz Scala:
Załóżmy, że uzyskanie DataFrame
następująco:
val results : DataFrame = sqlContext.sql(...)
Krok 1 call results.printSchema()
- to pokaże nie tylko kolumn w DataFrame i (to jest ważne) ich kolejność, ale także to, co Spark SQL uważa za ich typy. Gdy zobaczysz ten wynik, rzeczy stają się o wiele mniej tajemnicze.
Krok 2: Get RDD[Row]
z DataFrame
:
val rows: RDD[Row] = results.rdd
Krok 3: Teraz to tylko kwestia ciągnięcie cokolwiek pola zainteresowania cię z poszczególnych wierszy. W tym celu musisz znać pozycję każdego pola i jego typu na podstawie 0 i na szczęście uzyskałeś to wszystko w Kroku 1 powyżej. Na przykład, powiedzmy, że zrobił SELECT x, y, z, w FROM ...
i drukowanie schematu przyniosły
root
|-- x double (nullable = ...)
|-- y string (nullable = ...)
|-- z integer (nullable = ...)
|-- w binary (nullable = ...)
I powiedzmy, wszystko co chciał wykorzystać x
i z
. Można wyciągnąć je z RDD[(Double, Integer)]
się następująco:
rows.map(row => {
// x has position 0 and type double
// z has position 2 and type integer
(row.getDouble(0), row.getInt(2))
})
Stąd wystarczy użyć rdzenia Spark stworzyć odpowiednie obiekty MLlib. Sprawy mogą być nieco bardziej skomplikowane, jeśli SQL zwraca kolumny typu tablicowego, w takim przypadku będziesz musiał wywołać getList(...)
dla tej kolumny.
Zakładając, że używasz Java (Spark wersję 1.6.2):
Oto prosty przykład kodu Java przy użyciu DataFrame uczenia maszynowego.
ładuje JSON o następującej strukturze
[{ "znacznik" 1 "att2": 5,037089672359123 "att1": 2,4100883023159456}, ...]
dzieli dane do szkolenia i egzaminowania,
- pociągów model używając danych o pociągu
- zastosowania modelu do danych testowych i
- Stor es wyników.
Co więcej zgodnie z official documentation "API oparty na DataFrame to podstawowe API" dla MLlib od obecnej wersji 2.0.0. Więc możesz znaleźć kilka przykładów przy użyciu DataFrame.
Kod:
SparkConf conf = new SparkConf().setAppName("MyApp").setMaster("local[2]");
SparkContext sc = new SparkContext(conf);
String path = "F:\\SparkApp\\test.json";
String outputPath = "F:\\SparkApp\\justTest";
System.setProperty("hadoop.home.dir", "C:\\winutils\\");
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().json(path);
df.registerTempTable("tmp");
DataFrame newDF = df.sqlContext().sql("SELECT att1, att2, label FROM tmp");
DataFrame dataFixed = newDF.withColumn("label", newDF.col("label").cast("Double"));
VectorAssembler assembler = new VectorAssembler().setInputCols(new String[]{"att1", "att2"}).setOutputCol("features");
StringIndexer indexer = new StringIndexer().setInputCol("label").setOutputCol("labelIndexed");
// Split the data into training and test
DataFrame[] splits = dataFixed.randomSplit(new double[] {0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];
DecisionTreeClassifier dt = new DecisionTreeClassifier().setLabelCol("labelIndexed").setFeaturesCol("features");
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {assembler, indexer, dt});
// Train model
PipelineModel model = pipeline.fit(trainingData);
// Make predictions
DataFrame predictions = model.transform(testData);
predictions.rdd().coalesce(1,true,null).saveAsTextFile("justPlay.txt" +System.currentTimeMillis());
Nie wspomniano typ danych kolumnach ale jeśli są numeryczne (liczba całkowita, podwójne, itp) można użyć [VectorAssembler] (http: //spark.apache .org/docs/latest/ml-features.html # vectorassembler) do konwersji kolumn elementów w jedną kolumnę [Vector] (http://spark.apache.org/docs/latest/mllib-data-types.html) . – Ben