- 赫夫曼树
- 赫夫曼编码-数据压缩
- 压缩
- 解码
- 数据的加载和保存
- 加载数据
- 保存数据
- MySql
- Hive
- SparkStreaming
- 背压机制
- wordcount
- RDD队列
- 自定义数据采集器
1)给定n个权值作为n个叶子结点,构造一棵二叉树,若该树的带权路径长度(wpl)达到最小,称这样的二叉树为最优二叉树,也称为哈夫曼树(Huffman Tree), 还有的书翻译为霍夫曼树。
2)赫夫曼树是带权路径长度最短的树,权值较大的结点离根较近。
构成赫夫曼树的步骤:
1)从小到大进行排序, 将每一个数据,每个数据都是一个节点 , 每个节点可以看成是一颗最简单的二叉树
2)取出根节点权值最小的两颗二叉树
3)组成一颗新的二叉树, 该新的二叉树的根节点的权值是前面两颗二叉树根节点权值的和
4)再将这颗新的二叉树,以根节点的权值大小 再次排序, 不断重复 1-2-3-4 的步骤,直到数列中,所有的数据都被处理,就得到一颗赫夫曼树
package huffmantree;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
public class HuffmanTree {
public static void main(String[] args) {
int []arr={13,7,8,3,29,6,1};
Node root = createHuffmanTree(arr);
root.preOrder();
}
//创建赫夫曼树的方法
public static Node createHuffmanTree(int[]arr ) {
//1、遍历arr数组
//2、将arr的每个元素构成一个node
//3、将Node放入到ArrayList中
List nodes = new ArrayList<>();
for (int value : arr) {
nodes.add(new Node(value));
}
while (nodes.size() > 1) {
//排序 从小到大
Collections.sort(nodes);
//取出根节点权值最小的两颗二叉树
//(1)取出权值最小的节点(二叉树)
Node leftNode = nodes.get(0);
//(2)取出权值第二小的节点(二叉树)
Node rightNode = nodes.get(1);
//(3)构建一颗新的二叉树
Node parent = new Node(leftNode.value + rightNode.value);
parent.left = leftNode;
parent.right = rightNode;
//(4)从ArrayList中删除处理过的二叉树
nodes.remove(leftNode);
nodes.remove(rightNode);
//(5)将parent加入到nodes
nodes.add(parent);
}
return nodes.get(0);
}
}
//创建节点类
//为了让Node 对象支持排序Collections集合排序
//让Node 实现Comparable接口
class Node implements Comparable{
int value;//节点权值
Node left;//指向左子节点
Node right;//指向右子节点
public Node(int value){
this.value=value;
}
//前序遍历
public void preOrder(){
System.out.println(this.value);
if(this.left!=null)this.left.preOrder();
if(this.right!=null)this.right.preOrder();
}
@Override
public String toString() {
return "Node{" +
"value=" + value +
'}';
}
@Override
public int compareTo(Node o) {
//从小到大排序
return this.value-o.value;
}
}
赫夫曼编码-数据压缩
1)赫夫曼编码也翻译为哈夫曼编码(Huffman Coding),又称霍夫曼编码,是一种编码方式, 属于一种程序算法
2)赫夫曼编码是赫哈夫曼树在电讯通信中的经典的应用之一。
3)赫夫曼编码广泛地用于数据文件压缩。其压缩率通常在20%~90%之间
4)赫夫曼码是可变字长编码(VLC)的一种。Huffman于1952年提出一种编码方法,称之为最佳编码
package huffmantree.huffmancode;
import java.util.*;
public class HuffmanCode {
public static void main(String[] args) {
String content="i like like like java do you like a java";
byte[]contentBytes=content.getBytes();
byte[] huffmanCodesBytes = huffmanZip(contentBytes);
System.out.println("压缩后的结果是:"+Arrays.toString(huffmanCodesBytes));
}
//使用一个方法,将前面的方法封装起来,便于调用
private static byte[] huffmanZip(byte[]bytes){
//
Listnode=getNodes(bytes);
//更具node创建赫夫曼树
Node huffmanTreeRoot=createHuffmanTree(node);
Map huffmanCodes = getCodes(huffmanTreeRoot);
//根据生成的赫夫曼编码,压缩得到压缩后的赫夫曼编码字节数组
return zip(bytes,huffmanCodes);
}
private static byte[] zip(byte[]bytes,Map huffmanCodes){
//1、利用huffmanCodes 将 bytes 转成 赫夫曼编码对应的字符串
StringBuilder stringBuilder = new StringBuilder();
for(byte b:bytes){
stringBuilder.append(huffmanCodes.get(b));
}
//将stringBuilder转成 byte[]
//统计返回 byte[] huffmanCode 长度
int len;
if(stringBuilder.length()%8==0){
len=stringBuilder.length()/8;
}else{
len =stringBuilder.length()/8+1;
}
//创建 存储压缩后的byte数组
byte[]huffmanCodeBytes=new byte[len];
int index=0;
byte[]by=new byte[len];
for (int i = 0; i < stringBuilder.length(); i+=8) {//每八位对应一个byte,所以步长+8
String strByte;
if(i+8>stringBuilder.length()){//不够八位
strByte=stringBuilder.substring(i);
}else {
strByte = stringBuilder.substring(i, i + 8);
}
//将strByte 转换成一个 byte ,放入到 huffmanCodeBytes
huffmanCodeBytes[index]=(byte) Integer.parseInt(strByte,2);
index++;
}
return huffmanCodeBytes;
}
private static ListgetNodes(byte[]bytes){
ArrayList nodes=new ArrayList<>();
//遍历bytes,统计每一个byte出现次数->map[key,value]
Map counts=new HashMap<>();
for(byte b:bytes){
Integer count=counts.getOrDefault(b,0);
counts.put(b,count+1);
}
//把每一个键值对转成一个Node对象,并加入到nodes集合
for(Map.Entryentry:counts.entrySet()){
nodes.add(new Node(entry.getKey(),entry.getValue()));
}
return nodes;
}
//创建对应的赫夫曼树
private static Node createHuffmanTree(Listnodes){
while(nodes.size()>1){
//从小到大
Collections.sort(nodes);
Node left=nodes.get(0);
Node right=nodes.get(1);
Node parent=new Node(null,left.weight+right.weight);
parent.left=left;
parent.right=right;
nodes.remove(left);
nodes.remove(right);
nodes.add(parent);
}
return nodes.get(0);
}
//生成赫夫曼树对应的赫夫曼编码
static Map huffmanCodes=new HashMap<>();
//static StringBuilder stringBuilder=new StringBuilder();
private static MapgetCodes(Node root){
if(root==null){
return null;
}
//处理root的左子树
getCodes(root.left,"0",new StringBuilder());
//处理root的右子树
getCodes(root.right,"1",new StringBuilder());
return huffmanCodes;
}
private static void getCodes(Node node,String code,StringBuilder stringBuilder){
StringBuilder stringBuilder2 = new StringBuilder(stringBuilder);
//将code加到stringBuilder2
stringBuilder2.append(code);
if(node!=null){//如果node==null 不处理
//判断当前node 是叶子结点还是非叶子结点
if(node.data==null){//非叶子结点
//递归处理
//向左
getCodes(node.left,"0",stringBuilder2);
//向右递归
getCodes(node.right,"1",stringBuilder2);
}else{//说明是一个叶子结点
//表示找到某个叶子结点的最后
huffmanCodes.put(node.data,stringBuilder2.toString());
}
}
}
}
class Node implements Comparable{
Byte data;//存放数据
int weight;//权值,表示字符出现的次数
Node left;
Node right;
public Node(Byte data, int weight) {
this.data = data;
this.weight = weight;
}
@Override
public int compareTo(Node o) {
//从小到大排序
return this.weight-o.weight;
}
@Override
public String toString() {
return "Node{" +
"data=" + data +
", weight=" + weight +
'}';
}
//前序遍历
public void preOrder(){
System.out.println(this);
if(this.left!=null)this.left.preOrder();
if(this.right!=null)this.right.preOrder();
}
}
解码
public class HuffmanCode {
public static void main(String[] args) {
String content = "i like like like java do you like a java";
byte[] contentBytes = content.getBytes();
byte[] huffmanCodesBytes = huffmanZip(contentBytes);
System.out.println("压缩后的结果是:" + Arrays.toString(huffmanCodesBytes));
byte[] sourceBytes = decode(huffmanCodes, huffmanCodesBytes);
System.out.println("原来的字符串:"+new String(sourceBytes));
}
//编写一个方法,完成对压缩数据的解码
private static byte[]decode(MaphuffmanCodes,byte[]huffmanBytes){
//先得到huffmanBytes对应的二进制的字符串,
StringBuilder stringBuilder = new StringBuilder();
//将byte数组转成二进制的字符串
for (int i = 0; i < huffmanBytes.length; i++) {
byte b=huffmanBytes[i];
//判断是不是最后一个字节
boolean flag=(i==huffmanBytes.length-1);
stringBuilder.append(byteToBitString(!flag,b));
}
System.out.println(stringBuilder);
//把字符按串按照指定的赫夫曼编码进行解码
//把赫夫曼编码表进行调换
Mapmap=new HashMap<>();
for(Map.Entry entry:huffmanCodes.entrySet()){
map.put(entry.getValue(),entry.getKey());
}
//创建一个集合,存放byte
Listlist=new ArrayList<>();
//i 可以理解成一个索引 扫描 stringBuilder
for(int i=0;i
数据的加载和保存
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的 API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式 为 parquet
加载数据
spark.read.load 是加载数据的通用方法
csv format jdbc json load option options orc parquet schema
table text textFile
如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")
➢ format("…"):指定加载的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。
➢ load("…"):在"csv"、“jdbc”、“json”、“orc”、“parquet"和"textFile"格式下需要传入加载 数据的路径。 ➢ option(”…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 我们前面都是使用 read API 先把文件加载到 Dataframe 然后再查询,其实,我们也可以直 接在文件上进行查询: 文件格式.文件路径
scala>spark.sql(“select * from json./opt/module/data/user.json”).show
保存数据
df.write.save 是保存数据的通用方法
csv jdbc json orc parquet textFile… …
如果保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")
➢ format("…"):指定保存的数据类型,包括"csv"、“jdbc”、“json”、“orc”、"parquet"和 “textFile”。
➢ save ("…"):在"csv"、“orc”、"parquet"和"textFile"格式下需要传入保存数据的路径。
➢ option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。 有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。
SaveMode 是一个枚举类,其中的常量包括:
Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常
SaveMode.Append “append” 如果文件已经存在则追加
SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖
SaveMode.Ignore “ignore” 如果文件已经存在则忽略
df.write.mode(“append”).json("/opt/module/data/output")
MySql
package sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
object SparkSqlJDBC {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark: SparkSession =SparkSession.builder().config(sparkConf).getOrCreate()
//读取mysql数据
val df: Dataframe = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/jw?user=root&password=1234")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("password", "1234")
.option("dbtable", "course")
.load()
df.show()
//保存数据
df.write
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/test?user=root&password=1234")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("password", "1234")
.option("dbtable", "course")
.mode(SaveMode.Append)
.save()
}
}
Hive
org.apache.spark
spark-hive_2.12
3.0.0
org.apache.hive
hive-exec
1.2.1
mysql
mysql-connector-java
5.1.27
将 hive-site.xml 文件拷贝到项目的 resources 目录中,代码实现
#显示表头
hive.cli.print.header
true
hive.cli.print.current.db
true
javax.jdo.option.ConnectionURL
jdbc:mysql://hadoop103:3306/metastore?createDatabaseIfNotExist=true
JDBC connect string for a JDBC metastore
javax.jdo.option.ConnectionDriverName
com.mysql.jdbc.Driver
Driver class name for a JDBC metastore
javax.jdo.option.ConnectionUserName
root
username to use against metastore database
javax.jdo.option.ConnectionPassword
#密码
123456
password to use against metastore database
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark: SparkSession =SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
spark.sql("show tables").show()
}
SparkStreaming
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、 Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语 如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
SparkStreaming 准实时(秒,分钟),微批次(时间) 的数据处理框架
DStream 就是对 RDD 在实时数据处理场景的一种封装。
背压机制
为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制 数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值 false,即不启用。
wordcount
org.apache.spark
spark-streaming_2.12
3.0.0
➢ 需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并 统计不同单词出现的次数
package sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object WordCount {
def main(args: Array[String]): Unit = {
//创建环境对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCont")
//第一个参数表示环境配置
//第二个参数表示批量处理的周期(采集周期)
val ssc: StreamingContext =new StreamingContext(sparkConf,Seconds(3))
//获取端口数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordToOne: DStream[(String, Int)] = words.map((_, 1))
val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)
wordToCount.print()
wordToCount.saveAsTextFiles("datas/result")
//由于SparkStreaming采集器是长期执行的任务 不能直接关闭
//如果main方法执行完毕,应用程序也会自动结束,所以不能让main执行完毕
//1、启动采集器
ssc.start()
//2、等待采集器的关闭
ssc.awaitTermination()
}
}
RDD队列
测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到 这个队列中的 RDD,都会作为一个 DStream 处理。
➢ 需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object Queue {
def main(args: Array[String]): Unit = {
//1.初始化 Spark 配置信息
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
//2.初始化 SparkStreamingContext
val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))
//3.创建 RDD 队列
val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream: DStream[(Int, Int)] = inputStream.map((_,1))
val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
自定义数据采集器
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random
object DIY {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
val ssc: StreamingContext = new StreamingContext(conf, Seconds(4))
val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
messageDS.print()
ssc.start()
ssc.awaitTermination()
}
}
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
private var flag=true;
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while(flag){
val message: String ="采集的数据为"+new Random().nextInt(10).toString
store(message)
Thread.sleep(500)
}
}
}).start()
}
override def onStop(): Unit = {
flag=false
}
}



