2016-12-14 12 views
6

Próbuję konwertować dane przechowywane w S3 jako pliki tekstowe JSON-per-line do strukturalnego, kolumnowego formatu, takiego jak ORC lub Parkiet na S3.Demultipleksowanie RDD na wiele tabel ORC

Pliki źródłowe zawierają dane wielu schematów (np. Żądanie HTTP, odpowiedź HTTP, ...), które należy przeanalizować w różnych danych typu Spark odpowiedniego typu.

Przykładowe schematy:

val Request = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false), 
    StructField("requestId", LongType), 
    StructField("requestMethod", StringType), 
    StructField("scheme", StringType), 
    StructField("host", StringType), 
    StructField("headers", MapType(StringType, StringType, valueContainsNull=false)), 
    StructField("path", StringType), 
    StructField("sessionId", StringType), 
    StructField("userAgent", StringType) 
)) 

    val Response = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false), 
    StructField("requestId", LongType), 
    StructField("contentType", StringType), 
    StructField("contentLength", IntegerType), 
    StructField("statusCode", StringType), 
    StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)), 
    StructField("responseDuration", DoubleType), 
    StructField("sessionId", StringType) 
)) 

mam tę część pracuje bez zarzutu, jednak stara się pisać dane z powrotem do S3 tak skutecznie, jak to możliwe wydaje się być kwestia atm.

Próbowałem 3 podejścia:

  1. muxPartitions z Silex projektu
  2. buforowanie analizowany wejście S3 i zapętlenie nad nim kilka razy
  3. co każdy program typu oddzielną partycję z RDD

W pierwszym przypadku JVM zabrakło pamięci, w drugim zaś skończyło się miejsce na dysku.

Trzeci test nie został jeszcze dokładnie przetestowany, ale nie wydaje się to efektywnym wykorzystaniem mocy obliczeniowej (ponieważ tylko jeden węzeł klastra (ten, na którym znajduje się ta partycja) faktycznie zapisywałby dane z powrotem do S3).

odpowiedni kod:

val allSchemes = Schemes.all().keys.toArray 

if (false) { 
    import com.realo.warehouse.multiplex.implicits._ 

    val input = readRawFromS3(inputPrefix) // returns RDD[Row] 
    .flatMuxPartitions(allSchemes.length, data => { 
     val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] } 
     data.foreach { 
     logItem => { 
      val schemeIndex = allSchemes.indexOf(logItem.logType) 
      if (schemeIndex > -1) { 
      buffers(schemeIndex).append(logItem.row) 
      } 
     } 
     } 
     buffers 
    }) 

    allSchemes.zipWithIndex.foreach { 
    case (schemeName, index) => 
     val rdd = input(index) 

     writeColumnarToS3(rdd, schemeName) 
    } 
} else if (false) { 
    // Naive approach 
    val input = readRawFromS3(inputPrefix) // returns RDD[Row] 
    .persist(StorageLevel.MEMORY_AND_DISK) 

    allSchemes.foreach { 
    schemeName => 
     val rdd = input 
     .filter(x => x.logType == schemeName) 
     .map(x => x.row) 

     writeColumnarToS3(rdd, schemeName) 
    } 

    input.unpersist() 
} else { 
    class CustomPartitioner extends Partitioner { 
    override def numPartitions: Int = allSchemes.length 
    override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String]) 
    } 

    val input = readRawFromS3(inputPrefix) 
     .map(x => (x.logType, x.row)) 
     .partitionBy(new CustomPartitioner()) 
     .map { case (logType, row) => row } 
     .persist(StorageLevel.MEMORY_AND_DISK) 

    allSchemes.zipWithIndex.foreach { 
     case (schemeName, index) => 
     val rdd = input 
      .mapPartitionsWithIndex(
      (i, iter) => if (i == index) iter else Iterator.empty, 
      preservesPartitioning = true 
     ) 

     writeColumnarToS3(rdd, schemeName) 
    } 

    input.unpersist() 
} 

koncepcyjnej, myślę, że kod powinien mieć 1 wyjście DStream danego typu programu i RDD wejściowy powinien odebrać „n umieścić każdy przetworzony element na właściwej DStream (z dozujący dla lepszej wydajności).

Czy ktoś ma jakieś wskazówki, jak to wdrożyć? Czy istnieje lepszy sposób rozwiązania tego problemu?

Odpowiedz

0

To co wymyśliłem w końcu:

użyć niestandardowego partycjonowania partycji danych w oparciu o ich programu plus hashcode rzędu.

Powodem tego jest to, że chcemy mieć możliwość przetwarzania tylko niektórych partycji, ale nadal zezwalamy na udział wszystkich węzłów (ze względu na wydajność). Nie rozpowszechniamy danych tylko na 1 partycji, ale na X partycjach (gdzie X to liczba węzłów razy 2, w tym przykładzie).

Następnie dla każdego schematu przycinamy partycje, których nie potrzebujemy, a tym samym przetwarzamy tylko te, które wykonujemy.

przykładem Kod:

def process(date : ReadableInstant, schemesToProcess : Array[String]) = { 
    // Tweak this based on your use case 
    val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2 

    class CustomPartitioner extends Partitioner { 
    override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions 
    override def getPartition(key: Any): Int = { 
     // This is tightly coupled with how `input` gets transformed below 
     val (logType, rowHashCode) = key.asInstanceOf[(String, Int)] 
     (schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions) 
    } 

    /** 
     * Internal helper function to retrieve all partition indices for the given key 
     * @param key input key 
     * @return 
     */ 
    private def getPartitions(key: String): Seq[Int] = { 
     val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions 
     index until (index + DefaultNumberOfStoragePartitions) 
    } 

    /** 
     * Returns an RDD which only traverses the partitions for the given key 
     * @param rdd base RDD 
     * @param key input key 
     * @return 
     */ 
    def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = { 
     val partitions = getPartitions(key).toSet 
     PartitionPruningRDD.create(rdd, x => partitions.contains(x)) 
    } 
    } 

    val partitioner = new CustomPartitioner() 
    val input = readRawFromS3(date) 
    .map(x => ((x.logType, x.row.hashCode), x.row)) 
    .partitionBy(partitioner) 
    .persist(StorageLevel.MEMORY_AND_DISK_SER) 

    // Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD 
    val schemesInRdd = input 
    .map(_._1._1) 
    .distinct() 
    .collect() 

    // Remaining stages: for each scheme, write it out to S3 as ORC 
    schemesInRdd.zipWithIndex.foreach { 
    case (schemeName, index) => 
     val rdd = partitioner.filterRDDForKey(input, schemeName) 
     .map(_._2) 
     .coalesce(DefaultNumberOfStoragePartitions) 

     writeColumnarToS3(rdd, schemeName) 
    } 

    input.unpersist() 
} 
0

Biorąc pod uwagę, że dane wejściowe to json, można odczytać je w ramce danych ciągów (każda linia jest pojedynczym ciągiem znaków). Następnie możesz wyodrębnić typ z każdego json (używając UDF lub używając funkcji, takich jak get_json_object lub json_tuple).

Teraz masz dwie kolumny: Typ i oryginalny json. Teraz możesz używać opcji partitionBy dla ramek danych podczas zapisu ramki danych. Spowoduje to utworzenie katalogu dla każdego typu, a zawartość katalogu będzie zawierać oryginalne jsony.

Teraz możesz czytać każdy typ z własnym schematem.

Podobnie można zrobić z RDD za pomocą mapy, która zamienia wejściowy rdd na parę rdd z kluczem będącym typem i wartością będącą json konwertowanym na schemat docelowy. Następnie możesz użyć partitionBy i partycji map, aby zapisać każdą partycję w pliku lub możesz użyć funkcji zmniejszania klucza do zapisywania do różnych plików (np. Za pomocą klawisza, aby ustawić nazwę pliku).

Można również spojrzeć na Write to multiple outputs by key Spark - one Spark job

pamiętać, że zakłada przy tym, że celem jest podzielona do pliku. W zależności od konkretnego przypadku użycia, inne opcje mogą być opłacalne.Na przykład, jeśli różne schematy są wystarczająco blisko, możesz utworzyć super schemat, który obejmuje wszystkie z nich i utworzyć ramkę danych bezpośrednio z tego. Następnie możesz albo pracować bezpośrednio na ramce danych, albo użyć partycji danych, aby zapisać różne podtypy w różnych katalogach (ale tym razem już zapisano w parkiecie).

+0

Korzystanie jakąś zunifikowanej "super-schematu" przyszło mi do głowy, jak również. Ale w końcu wydaje się, że to hack. Ponadto, SparkSQL nie wydaje się natywnie obsługiwać UnionType lub czegoś podobnego. Zapisywanie danych jako JSON nie jest rozwiązaniem, do czego służą dane źródłowe. Celem jest zarówno podzielenie plików źródłowych na wiele oddzielnych plików kolumnowych, jak i upewnienie się, że dane są mocno wpisane (dzięki czemu można łatwo wyszukiwać za pomocą np. Presto). – mcuelenaere

+0

Następnie należy wykonać mapę, która przekształci się w parę kluczy z kluczem będącym typem i wartością będącą obiektem. Możesz wtedy albo partycjonować według i mapować partycję, albo użyć skrótu klawiszowego, aby napisać bezpośrednio do pliku docelowego –