Próbuję konwertować dane przechowywane w S3 jako pliki tekstowe JSON-per-line do strukturalnego, kolumnowego formatu, takiego jak ORC lub Parkiet na S3.Demultipleksowanie RDD na wiele tabel ORC
Pliki źródłowe zawierają dane wielu schematów (np. Żądanie HTTP, odpowiedź HTTP, ...), które należy przeanalizować w różnych danych typu Spark odpowiedniego typu.
Przykładowe schematy:
val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))
val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))
mam tę część pracuje bez zarzutu, jednak stara się pisać dane z powrotem do S3 tak skutecznie, jak to możliwe wydaje się być kwestia atm.
Próbowałem 3 podejścia:
- muxPartitions z Silex projektu
- buforowanie analizowany wejście S3 i zapętlenie nad nim kilka razy
- co każdy program typu oddzielną partycję z RDD
W pierwszym przypadku JVM zabrakło pamięci, w drugim zaś skończyło się miejsce na dysku.
Trzeci test nie został jeszcze dokładnie przetestowany, ale nie wydaje się to efektywnym wykorzystaniem mocy obliczeniowej (ponieważ tylko jeden węzeł klastra (ten, na którym znajduje się ta partycja) faktycznie zapisywałby dane z powrotem do S3).
odpowiedni kod:
val allSchemes = Schemes.all().keys.toArray
if (false) {
import com.realo.warehouse.multiplex.implicits._
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.flatMuxPartitions(allSchemes.length, data => {
val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
data.foreach {
logItem => {
val schemeIndex = allSchemes.indexOf(logItem.logType)
if (schemeIndex > -1) {
buffers(schemeIndex).append(logItem.row)
}
}
}
buffers
})
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input(index)
writeColumnarToS3(rdd, schemeName)
}
} else if (false) {
// Naive approach
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.foreach {
schemeName =>
val rdd = input
.filter(x => x.logType == schemeName)
.map(x => x.row)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
} else {
class CustomPartitioner extends Partitioner {
override def numPartitions: Int = allSchemes.length
override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
}
val input = readRawFromS3(inputPrefix)
.map(x => (x.logType, x.row))
.partitionBy(new CustomPartitioner())
.map { case (logType, row) => row }
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input
.mapPartitionsWithIndex(
(i, iter) => if (i == index) iter else Iterator.empty,
preservesPartitioning = true
)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
}
koncepcyjnej, myślę, że kod powinien mieć 1 wyjście DStream danego typu programu i RDD wejściowy powinien odebrać „n umieścić każdy przetworzony element na właściwej DStream (z dozujący dla lepszej wydajności).
Czy ktoś ma jakieś wskazówki, jak to wdrożyć? Czy istnieje lepszy sposób rozwiązania tego problemu?
Korzystanie jakąś zunifikowanej "super-schematu" przyszło mi do głowy, jak również. Ale w końcu wydaje się, że to hack. Ponadto, SparkSQL nie wydaje się natywnie obsługiwać UnionType lub czegoś podobnego. Zapisywanie danych jako JSON nie jest rozwiązaniem, do czego służą dane źródłowe. Celem jest zarówno podzielenie plików źródłowych na wiele oddzielnych plików kolumnowych, jak i upewnienie się, że dane są mocno wpisane (dzięki czemu można łatwo wyszukiwać za pomocą np. Presto). – mcuelenaere
Następnie należy wykonać mapę, która przekształci się w parę kluczy z kluczem będącym typem i wartością będącą obiektem. Możesz wtedy albo partycjonować według i mapować partycję, albo użyć skrótu klawiszowego, aby napisać bezpośrednio do pliku docelowego –