栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

数据结构+spark

数据结构+spark

数据结构+spark
  • 赫夫曼树
  • 赫夫曼编码-数据压缩
    • 压缩
    • 解码
  • 数据的加载和保存
    • 加载数据
    • 保存数据
    • MySql
    • Hive
    • SparkStreaming
  • 背压机制
      • wordcount
      • RDD队列
      • 自定义数据采集器

赫夫曼树

1)给定n个权值作为n个叶子结点,构造一棵二叉树,若该树的带权路径长度(wpl)达到最小,称这样的二叉树为最优二叉树,也称为哈夫曼树(Huffman Tree), 还有的书翻译为霍夫曼树。

2)赫夫曼树是带权路径长度最短的树,权值较大的结点离根较近。

构成赫夫曼树的步骤:

1)从小到大进行排序, 将每一个数据,每个数据都是一个节点 , 每个节点可以看成是一颗最简单的二叉树

2)取出根节点权值最小的两颗二叉树

3)组成一颗新的二叉树, 该新的二叉树的根节点的权值是前面两颗二叉树根节点权值的和

4)再将这颗新的二叉树,以根节点的权值大小 再次排序, 不断重复 1-2-3-4 的步骤,直到数列中,所有的数据都被处理,就得到一颗赫夫曼树

package huffmantree;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public class HuffmanTree {

    public static void main(String[] args) {
        int []arr={13,7,8,3,29,6,1};

        Node root = createHuffmanTree(arr);
        root.preOrder();
    }


    //创建赫夫曼树的方法

    public static Node createHuffmanTree(int[]arr ) {
        //1、遍历arr数组
        //2、将arr的每个元素构成一个node
        //3、将Node放入到ArrayList中
        List nodes = new ArrayList<>();
        for (int value : arr) {
            nodes.add(new Node(value));
        }

        while (nodes.size() > 1) {
            //排序 从小到大
            Collections.sort(nodes);

            //取出根节点权值最小的两颗二叉树
            //(1)取出权值最小的节点(二叉树)
            Node leftNode = nodes.get(0);
            //(2)取出权值第二小的节点(二叉树)

            Node rightNode = nodes.get(1);

            //(3)构建一颗新的二叉树
            Node parent = new Node(leftNode.value + rightNode.value);
            parent.left = leftNode;
            parent.right = rightNode;

            //(4)从ArrayList中删除处理过的二叉树
            nodes.remove(leftNode);
            nodes.remove(rightNode);

            //(5)将parent加入到nodes
            nodes.add(parent);
        }

        return nodes.get(0);
    }
}

//创建节点类
//为了让Node 对象支持排序Collections集合排序
//让Node 实现Comparable接口

class Node implements Comparable{
    int value;//节点权值
    Node left;//指向左子节点
    Node right;//指向右子节点
    public Node(int value){
        this.value=value;
    }

    //前序遍历
    public void preOrder(){
        System.out.println(this.value);
        if(this.left!=null)this.left.preOrder();
        if(this.right!=null)this.right.preOrder();

    }

  @Override
    public String toString() {
        return "Node{" +
                "value=" + value +
                '}';
    }

    @Override
    public int compareTo(Node o) {
        //从小到大排序
        return this.value-o.value;
    }
}

赫夫曼编码-数据压缩

1)赫夫曼编码也翻译为哈夫曼编码(Huffman Coding),又称霍夫曼编码,是一种编码方式, 属于一种程序算法

2)赫夫曼编码是赫哈夫曼树在电讯通信中的经典的应用之一。

3)赫夫曼编码广泛地用于数据文件压缩。其压缩率通常在20%~90%之间

4)赫夫曼码是可变字长编码(VLC)的一种。Huffman于1952年提出一种编码方法,称之为最佳编码

压缩
package huffmantree.huffmancode;

import java.util.*;
public class HuffmanCode {
    public static void main(String[] args) {

        String content="i like like like java do you like a java";
        byte[]contentBytes=content.getBytes();

        byte[] huffmanCodesBytes = huffmanZip(contentBytes);
        System.out.println("压缩后的结果是:"+Arrays.toString(huffmanCodesBytes));

    }

    //使用一个方法,将前面的方法封装起来,便于调用
    private static byte[] huffmanZip(byte[]bytes){
        //
        Listnode=getNodes(bytes);
        //更具node创建赫夫曼树
        Node huffmanTreeRoot=createHuffmanTree(node);

        Map huffmanCodes = getCodes(huffmanTreeRoot);
        //根据生成的赫夫曼编码,压缩得到压缩后的赫夫曼编码字节数组
        return zip(bytes,huffmanCodes);

    }

    
    private static byte[] zip(byte[]bytes,Map huffmanCodes){
        //1、利用huffmanCodes 将 bytes 转成 赫夫曼编码对应的字符串
        StringBuilder stringBuilder = new StringBuilder();
        for(byte b:bytes){
            stringBuilder.append(huffmanCodes.get(b));
        }

        //将stringBuilder转成 byte[]
        //统计返回 byte[] huffmanCode 长度
        int len;
        if(stringBuilder.length()%8==0){
            len=stringBuilder.length()/8;
        }else{
            len =stringBuilder.length()/8+1;
        }

        //创建 存储压缩后的byte数组
        byte[]huffmanCodeBytes=new byte[len];
        int index=0;

        byte[]by=new byte[len];
        for (int i = 0; i < stringBuilder.length(); i+=8) {//每八位对应一个byte,所以步长+8
            String strByte;
            if(i+8>stringBuilder.length()){//不够八位
                strByte=stringBuilder.substring(i);
            }else {
                strByte = stringBuilder.substring(i, i + 8);
            }
            //将strByte 转换成一个 byte ,放入到 huffmanCodeBytes
            huffmanCodeBytes[index]=(byte) Integer.parseInt(strByte,2);
            index++;

        }
        return huffmanCodeBytes;

    }

    private static ListgetNodes(byte[]bytes){
        ArrayList nodes=new ArrayList<>();

        //遍历bytes,统计每一个byte出现次数->map[key,value]
        Map counts=new HashMap<>();
        for(byte b:bytes){
            Integer count=counts.getOrDefault(b,0);
            counts.put(b,count+1);
        }

        //把每一个键值对转成一个Node对象,并加入到nodes集合
        for(Map.Entryentry:counts.entrySet()){
            nodes.add(new Node(entry.getKey(),entry.getValue()));
        }


        return nodes;
    }

    //创建对应的赫夫曼树
    private static Node createHuffmanTree(Listnodes){
        while(nodes.size()>1){
            //从小到大
            Collections.sort(nodes);

            Node left=nodes.get(0);
            Node right=nodes.get(1);

            Node parent=new Node(null,left.weight+right.weight);
            parent.left=left;
            parent.right=right;

            nodes.remove(left);
            nodes.remove(right);

            nodes.add(parent);
        }
        return nodes.get(0);
    }

    //生成赫夫曼树对应的赫夫曼编码
    static Map huffmanCodes=new HashMap<>();
    //static StringBuilder stringBuilder=new StringBuilder();

    private static MapgetCodes(Node root){
        if(root==null){
            return null;
        }
        //处理root的左子树
        getCodes(root.left,"0",new StringBuilder());
        //处理root的右子树
        getCodes(root.right,"1",new StringBuilder());
        return huffmanCodes;
    }

    
    private static void getCodes(Node node,String code,StringBuilder stringBuilder){
        StringBuilder stringBuilder2 = new StringBuilder(stringBuilder);
        //将code加到stringBuilder2
        stringBuilder2.append(code);
        if(node!=null){//如果node==null 不处理
            //判断当前node 是叶子结点还是非叶子结点
            if(node.data==null){//非叶子结点
                //递归处理
                //向左
                getCodes(node.left,"0",stringBuilder2);
                //向右递归
                getCodes(node.right,"1",stringBuilder2);
            }else{//说明是一个叶子结点
                //表示找到某个叶子结点的最后
                huffmanCodes.put(node.data,stringBuilder2.toString());

            }
        }

    }




}
class Node implements Comparable{
    Byte data;//存放数据
    int weight;//权值,表示字符出现的次数

    Node left;
    Node right;

    public Node(Byte data, int weight) {
        this.data = data;
        this.weight = weight;
    }


    @Override
    public int compareTo(Node o) {
        //从小到大排序
        return this.weight-o.weight;
    }

    @Override
    public String toString() {
        return "Node{" +
                "data=" + data +
                ", weight=" + weight +
                '}';
    }

    //前序遍历
    public void preOrder(){
        System.out.println(this);
        if(this.left!=null)this.left.preOrder();
        if(this.right!=null)this.right.preOrder();
    }

}



解码
public class HuffmanCode {
    public static void main(String[] args) {

        String content = "i like like like java do you like a java";
        byte[] contentBytes = content.getBytes();

        byte[] huffmanCodesBytes = huffmanZip(contentBytes);
        System.out.println("压缩后的结果是:" + Arrays.toString(huffmanCodesBytes));

        byte[] sourceBytes = decode(huffmanCodes, huffmanCodesBytes);
        System.out.println("原来的字符串:"+new String(sourceBytes));


    }

    //编写一个方法,完成对压缩数据的解码

    
    private static byte[]decode(MaphuffmanCodes,byte[]huffmanBytes){

        //先得到huffmanBytes对应的二进制的字符串,

        StringBuilder stringBuilder = new StringBuilder();
        //将byte数组转成二进制的字符串
        for (int i = 0; i < huffmanBytes.length; i++) {
            byte b=huffmanBytes[i];
            //判断是不是最后一个字节
            boolean flag=(i==huffmanBytes.length-1);
            stringBuilder.append(byteToBitString(!flag,b));
        }
        System.out.println(stringBuilder);


        //把字符按串按照指定的赫夫曼编码进行解码
        //把赫夫曼编码表进行调换
        Mapmap=new HashMap<>();

        for(Map.Entry entry:huffmanCodes.entrySet()){
            map.put(entry.getValue(),entry.getKey());
        }

        //创建一个集合,存放byte
        Listlist=new ArrayList<>();
        //i 可以理解成一个索引 扫描 stringBuilder
        for(int i=0;i 

数据的加载和保存

SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的 API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式 为 parquet

加载数据

spark.read.load 是加载数据的通用方法

csv format jdbc json load option options orc parquet schema 
table text textFile

如果读取不同格式的数据,可以对不同的数据格式进行设定

scala> spark.read.format("…")[.option("…")].load("…")

➢ format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。

➢ load("…"):在"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile"格式下需要传入加载 数据的路径。 ➢ option(”…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 我们前面都是使用 read API 先把文件加载到 Dataframe 然后再查询,其实,我们也可以直 接在文件上进行查询: 文件格式.文件路径

scala>spark.sql(“select * from json./opt/module/data/user.json”).show

保存数据

df.write.save 是保存数据的通用方法

csv jdbc json orc parquet textFile… …

如果保存不同格式的数据,可以对不同的数据格式进行设定

scala>df.write.format("…")[.option("…")].save("…")

➢ format("…"):指定保存的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。

➢ save ("…"):在"csv"、“orc”、"parquet"和"textFile"格式下需要传入保存数据的路径。

➢ option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。 有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。

SaveMode 是一个枚举类,其中的常量包括:

Scala/Java Any Language Meaning

SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常

SaveMode.Append “append” 如果文件已经存在则追加

SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖

SaveMode.Ignore “ignore” 如果文件已经存在则忽略

df.write.mode(“append”).json("/opt/module/data/output")

MySql
package sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}

object SparkSqlJDBC {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark: SparkSession =SparkSession.builder().config(sparkConf).getOrCreate()

    //读取mysql数据
    val df: Dataframe = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/jw?user=root&password=1234")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("password", "1234")
      .option("dbtable", "course")
      .load()

    df.show()

    //保存数据
    df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/test?user=root&password=1234")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("password", "1234")
      .option("dbtable", "course")
      .mode(SaveMode.Append)
      .save()


  }

}
Hive

 org.apache.spark
 spark-hive_2.12
 3.0.0


 org.apache.hive
 hive-exec
 1.2.1


 mysql
 mysql-connector-java
 5.1.27

将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现




    #显示表头
    
        hive.cli.print.header
        true
    

    
        hive.cli.print.current.db
        true
    

    
        javax.jdo.option.ConnectionURL
        jdbc:mysql://hadoop103:3306/metastore?createDatabaseIfNotExist=true
        JDBC connect string for a JDBC metastore
    

    
        javax.jdo.option.ConnectionDriverName
        com.mysql.jdbc.Driver
        Driver class name for a JDBC metastore
    

    
        javax.jdo.option.ConnectionUserName
        root
        username to use against metastore database
    

    
        javax.jdo.option.ConnectionPassword
        #密码
        123456
        password to use against metastore database
    


def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark: SparkSession =SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()

    spark.sql("show tables").show()

  }
SparkStreaming

Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语 如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

SparkStreaming 准实时(秒,分钟),微批次(时间) 的数据处理框架

DStream 就是对 RDD 在实时数据处理场景的一种封装。

背压机制

​ 为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制 数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。

​ 通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值 false,即不启用。

wordcount

 org.apache.spark
 spark-streaming_2.12
 3.0.0

➢ 需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并 统计不同单词出现的次数

package sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object WordCount {
  def main(args: Array[String]): Unit = {
    //创建环境对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCont")
    //第一个参数表示环境配置
    //第二个参数表示批量处理的周期(采集周期)
    val ssc: StreamingContext =new StreamingContext(sparkConf,Seconds(3))

    //获取端口数据
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

    val words: DStream[String] = lines.flatMap(_.split(" "))

    val wordToOne: DStream[(String, Int)] = words.map((_, 1))

    val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

    wordToCount.print()

    wordToCount.saveAsTextFiles("datas/result")

    //由于SparkStreaming采集器是长期执行的任务 不能直接关闭
    //如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕

    //1、启动采集器
    ssc.start()
    //2、等待采集器的关闭
    ssc.awaitTermination()



  }


}

RDD队列

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。

➢ 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount

package streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable


object Queue {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
    //2.初始化 SparkStreamingContext
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))
    //3.创建 RDD 队列
    val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
    //4.创建 QueueInputDStream
    val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
    //5.处理队列中的 RDD 数据
    val mappedStream: DStream[(Int, Int)] = inputStream.map((_,1))
    val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)
    //6.打印结果
    reducedStream.print()
    //7.启动任务
    ssc.start()
    //8.循环创建并向 RDD 队列中放入 RDD
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }
    ssc.awaitTermination()
  }

}

自定义数据采集器
package streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Random

object DIY {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
    val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))

    val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
    messageDS.print()



    ssc.start()
    ssc.awaitTermination()
  }

}


class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){

  private var flag=true;

  override def onStart(): Unit = {
    new Thread(new Runnable {
      override def run(): Unit = {
        while(flag){
          val message: String ="采集的数据为"+new Random().nextInt(10).toString
          store(message)
          Thread.sleep(500)
        }
      }
    }).start()

  }

  override def onStop(): Unit = {
    flag=false

  }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600107.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号