栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

从Spark GroupedData对象中选择随机项目

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

从Spark GroupedData对象中选择随机项目

好吧,这是错误的。

GroupedData
并不是真正为数据访问而设计的。它仅描述分组标准并提供聚合方法。

这个想法的另一个问题是选择

N randomsamples
。如果没有对数据进行心理分组,这实际上是很难并行完成的任务,并且当您
call
对a进行分组时,这不会发生
Dataframe

至少有两种方法可以解决此问题:

  • 转换为RDD,

    groupBy
    并执行本地采样

    import random

    n = 3

    def sample(iter, n):
    rs = random.Random() # We should probably use os.urandom as a seed
    return rs.sample(list(iter), n)

    df = sqlContext.createDataframe(
    [(x, y, random.random()) for x in (1, 2, 3) for y in “abcdefghi”],
    (“teamId”, “x1”, “x2”))

    grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey()

    sampled = sqlContext.createDataframe(
    grouped.flatMap(lambda kv: sample(kv[1], n)))

    sampled.show()

    +------+—+-------------------+

    |teamId| x1| x2|

    +------+—+-------------------+

    | 1| g| 0.81921738561455|

    | 1| f| 0.8563875814036598|

    | 1| a| 0.9010425238735935|

    | 2| c| 0.3864428179837973|

    | 2| g|0.06233470405822805|

    | 2| d|0.37620872770129155|

    | 3| f| 0.7518901502732027|

    | 3| e| 0.5142305439671874|

    | 3| d| 0.6250620479303716|

    +------+—+-------------------+

  • 使用窗口功能

    from pyspark.sql import Window

    from pyspark.sql.functions import col, rand, rowNumber

    w = Window.partitionBy(col(“teamId”)).orderBy(col(“rnd_”))

    sampled = (df
    .withColumn(“rnd_”, rand()) # Add random numbers column
    .withColumn(“rn_”, rowNumber().over(w)) # Add rowNumber over windw
    .where(col(“rn_”) <= n) # Take n observations
    .drop(“rn_”) # drop helper columns
    .drop(“rnd_”))

    sampled.show()

    +------+—+--------------------+

    |teamId| x1| x2|

    +------+—+--------------------+

    | 1| f| 0.8563875814036598|

    | 1| g| 0.81921738561455|

    | 1| i| 0.8173912535268248|

    | 2| h| 0.10862995810038856|

    | 2| c| 0.3864428179837973|

    | 2| a| 0.6695356657072442|

    | 3| b|0.012329360826023095|

    | 3| a| 0.6450777858109182|

    | 3| e| 0.5142305439671874|

    +------+—+--------------------+

但恐怕两者都会相当昂贵。如果单个组的大小平衡且相对较大,我将简单地使用

Dataframe.randomSplit

如果组数相对较少,则可以尝试其他方法:

from pyspark.sql.functions import count, udffrom pyspark.sql.types import BooleanTypefrom operator import truedivcounts = (df    .groupBy(col("teamId"))    .agg(count("*").alias("n"))    .rdd.map(lambda r: (r.teamId, r.n))    .collectAsMap())# This defines fraction of observations from a group which should# be taken to get n values counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())sampled = (df    .withColumn("rnd_", rand())    .where(to_take(col("teamId"), col("rnd_")))    .drop("rnd_"))sampled.show()## +------+---+--------------------+## |teamId| x1|       x2|## +------+---+--------------------+## |     1|  d| 0.14815204548854788|## |     1|  f|  0.8563875814036598|## |     1|  g|    0.81921738561455|## |     2|  a|  0.6695356657072442|## |     2|  d| 0.37620872770129155|## |     2|  g| 0.06233470405822805|## |     3|  b|0.012329360826023095|## |     3|  h|  0.9022527556458557|## +------+---+--------------------+

在Spark 1.5+中,您可以替换

udf
为对
sampleBy
方法的调用:

df.sampleBy("teamId", counts_bd.value)

它不会为您提供确切的观测值数量,但在大多数情况下应该足够好,只要每组的观测值数量足以获取适当的样本即可。您也可以

sampleByKey
类似的方式在RDD上使用。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/645769.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号