Przed rozpoczęciem operacji: To jest jeszcze jeden inny groupByKey
. Chociaż ma wiele legalnych aplikacji, jest stosunkowo drogi, więc należy go używać tylko wtedy, gdy jest to wymagane.
Niezupełnie zwięzły lub wydajne rozwiązanie, ale można użyć UserDefinedAggregateFunction
wprowadzony Spark 1.5.0:
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))
}
Przykład użycia:
val df = sc.parallelize(Seq(
("username1", "friend1"),
("username1", "friend2"),
("username2", "friend1"),
("username2", "friend3")
)).toDF("username", "friend")
df.groupBy($"username").agg(GroupConcat($"friend")).show
## +---------+---------------+
## | username| friends|
## +---------+---------------+
## |username1|friend1,friend2|
## |username2|friend1,friend3|
## +---------+---------------+
Można również utworzyć otoki Python jako wyświetlane w Spark: How to map Python with Scala or Java User Defined Functions?
W praktyce może to być wyodrębnić RDD, groupByKey
, i odbudować DataFrame.
można uzyskać podobny efekt, łącząc collect_list
funkcji (Spark> = 1.6.0) z concat_ws
:
import org.apache.spark.sql.functions.{collect_list, udf, lit}
df.groupBy($"username")
.agg(concat_ws(",", collect_list($"friend")).alias("friends"))
What If Chcę go użyć w SQL Jak mogę zarejestrować to UDF w Spark SQL? –
@MurtazaKanchwala [Istnieje metoda "register", która akceptuje UDAFS] (https://github.com/apache/spark/blob/37c617e4f580482b59e1abbe3c0c27c7125cf605/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration .scala # L63-L69), więc powinien działać jako standardowy UDF. – zero323
@ zero323 dowolne podejście zrobić to samo w iskrze sql 1.4.1 –