好吧,这是错误的。
GroupedData并不是真正为数据访问而设计的。它仅描述分组标准并提供聚合方法。
这个想法的另一个问题是选择
N randomsamples。如果没有对数据进行心理分组,这实际上是很难并行完成的任务,并且当您
call对a进行分组时,这不会发生
Dataframe:
至少有两种方法可以解决此问题:
转换为RDD,
groupBy
并执行本地采样import random
n = 3
def sample(iter, n):
rs = random.Random() # We should probably use os.urandom as a seed
return rs.sample(list(iter), n)df = sqlContext.createDataframe(
[(x, y, random.random()) for x in (1, 2, 3) for y in “abcdefghi”],
(“teamId”, “x1”, “x2”))grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey()
sampled = sqlContext.createDataframe(
grouped.flatMap(lambda kv: sample(kv[1], n)))sampled.show()
+------+—+-------------------+
|teamId| x1| x2|
+------+—+-------------------+
| 1| g| 0.81921738561455|
| 1| f| 0.8563875814036598|
| 1| a| 0.9010425238735935|
| 2| c| 0.3864428179837973|
| 2| g|0.06233470405822805|
| 2| d|0.37620872770129155|
| 3| f| 0.7518901502732027|
| 3| e| 0.5142305439671874|
| 3| d| 0.6250620479303716|
+------+—+-------------------+
使用窗口功能
from pyspark.sql import Window
from pyspark.sql.functions import col, rand, rowNumber
w = Window.partitionBy(col(“teamId”)).orderBy(col(“rnd_”))
sampled = (df
.withColumn(“rnd_”, rand()) # Add random numbers column
.withColumn(“rn_”, rowNumber().over(w)) # Add rowNumber over windw
.where(col(“rn_”) <= n) # Take n observations
.drop(“rn_”) # drop helper columns
.drop(“rnd_”))sampled.show()
+------+—+--------------------+
|teamId| x1| x2|
+------+—+--------------------+
| 1| f| 0.8563875814036598|
| 1| g| 0.81921738561455|
| 1| i| 0.8173912535268248|
| 2| h| 0.10862995810038856|
| 2| c| 0.3864428179837973|
| 2| a| 0.6695356657072442|
| 3| b|0.012329360826023095|
| 3| a| 0.6450777858109182|
| 3| e| 0.5142305439671874|
+------+—+--------------------+
但恐怕两者都会相当昂贵。如果单个组的大小平衡且相对较大,我将简单地使用
Dataframe.randomSplit。
如果组数相对较少,则可以尝试其他方法:
from pyspark.sql.functions import count, udffrom pyspark.sql.types import BooleanTypefrom operator import truedivcounts = (df .groupBy(col("teamId")) .agg(count("*").alias("n")) .rdd.map(lambda r: (r.teamId, r.n)) .collectAsMap())# This defines fraction of observations from a group which should# be taken to get n values counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())sampled = (df .withColumn("rnd_", rand()) .where(to_take(col("teamId"), col("rnd_"))) .drop("rnd_"))sampled.show()## +------+---+--------------------+## |teamId| x1| x2|## +------+---+--------------------+## | 1| d| 0.14815204548854788|## | 1| f| 0.8563875814036598|## | 1| g| 0.81921738561455|## | 2| a| 0.6695356657072442|## | 2| d| 0.37620872770129155|## | 2| g| 0.06233470405822805|## | 3| b|0.012329360826023095|## | 3| h| 0.9022527556458557|## +------+---+--------------------+在Spark 1.5+中,您可以替换
udf为对
sampleBy方法的调用:
df.sampleBy("teamId", counts_bd.value)它不会为您提供确切的观测值数量,但在大多数情况下应该足够好,只要每组的观测值数量足以获取适当的样本即可。您也可以
sampleByKey类似的方式在RDD上使用。



