栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Java 高并发之设计模式,kafka的原理

Java 高并发之设计模式,kafka的原理

1 public class FutureDemo1 {

2

3     public static void main(String[] args) throws InterruptedException, ExecutionException {

4         FutureTask future = new FutureTask(new Callable() {

5             @Override

6             public String call() throws Exception {

7                 return new RealData().costTime();

8             }

9         });

10         ExecutorService service = Executors.newCachedThreadPool();

11         service.submit(future);

12

13         System.out.println(“RealData方法调用完毕”);

14         // 模拟主函数中其他耗时操作

15         doOtherThing();

16         // 获取RealData方法的结果

17         System.out.println(future.get());

18     }

19

20     private static void doOtherThing() throws InterruptedException {

21         Thread.sleep(2000L);

22     }

23 }

24

25 class RealData {

26

27     public String costTime() {

28         try {

29             // 模拟RealData耗时操作

30             Thread.sleep(1000L);

31             return “result”;

32         } catch (InterruptedException e) {

33             e.printStackTrace();

34         }

35         return “exception”;

36     }

37

38 }

通过Future实现

与上述FutureTask不同的是, RealData需要实现Callable接口

1 public class FutureDemo2 {

2

3     public static void main(String[] args) throws InterruptedException, ExecutionException {

4         ExecutorService service = Executors.newCachedThreadPool();

5         Future future = service.submit(new RealData2());

6

7         System.out.println(“RealData2方法调用完毕”);

8         // 模拟主函数中其他耗时操作

9         doOtherThing();

10         // 获取RealData2方法的结果

11         System.out.println(future.get());

12     }

13

14     private static void doOtherThing() throws InterruptedException {

15         Thread.sleep(2000L);

16     }

17 }

18

19 class RealData2 implements Callable{

20

21     public String costTime() {

22         try {

23             // 模拟RealData耗时操作

24             Thread.sleep(1000L);

25             return “result”;

26         } catch (InterruptedException e) {

27             e.printStackTrace();

28         }

29         return “exception”;

30     }

31

32     @Override

33     public String call() throws Exception {

34         return costTime();

35     }

36 }

另外Future本身还提供了一些额外的简单控制功能, 其API如下

1     // 取消任务

2     boolean cancel(boolean mayInterruptIfRunning);

3     // 是否已经取消

4     boolean isCancelled();

5     // 是否已经完成

6     boolean isDone();

7     // 取得返回对象

8     V get() throws InterruptedException, ExecutionException;

9     // 取得返回对象, 并可以设置超时时间

10     V get(long timeout, TimeUnit unit)

11 throws InterruptedException, ExecutionException, TimeoutException;

生产消费者模式

生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。

在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。

生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。

生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下

PCData为我们需要处理的元数据模型, 生产者构建PCData, 并放入缓冲队列.

消费者从缓冲队列中获取数据, 并执行计算.

生产者核心代码

1         while(isRunning) {

2             Thread.sleep(r.nextInt(SLEEP_TIME));

3             data = new PCData(count.incrementAndGet);

4             // 构造任务数据

5             System.out.println(data + " is put into queue");

6             if (!queue.offer(data, 2, TimeUnit.SECONDS)) {

7                 // 将数据放入队列缓冲区中

8                 System.out.println("faild to put data: " + data);

9             }

10         }

消费者核心代码

1         while (true) {

2             PCData data = queue.take();

3             // 提取任务

4             if (data != null) {

5                 // 获取数据, 执行计算操作

6                 int re = data.getData() * 10;

7                 System.out.println("after cal, value is : " + re);

8                 Thread.sleep(r.nextInt(SLEEP_TIME));

9             }

10         }

生产消费者模式可以有效对数据解耦, 优化系统结构.

降低生产者和消费者线程相互之间的依赖与性能要求.

一般使用BlockingQueue作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,

如果对缓冲队列有性能要求, 则可以使用基于CAS无锁设计的ConcurrentlinkedQueue.

分而治之

严格来讲, 分而治之不算一种模式, 而是一种思想.

它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.

我们主要讲两个场景, Master-Worker模式, ForkJoin线程池.

Master-Worker模式

该模式核心思想是系统由两类进行协助工作: Master进程, Worker进程.

Master负责接收与分配任务, Worker负责处理任务. 当各个Worker处理完成后,

将结果返回给Master进行归纳与总结.

假设一个场景, 需要计算100个任务, 并对结果求和, Master持有10个子进程.

Master代码

1 public class MasterDemo {

2     // 盛装任务的集合

3     private ConcurrentlinkedQueue workQueue = new ConcurrentlinkedQueue();

4     // 所有worker

5     private HashMap workers = new HashMap<>();

6     // 每一个worker并行执行任务的结果

7     private ConcurrentHashMap resultMap = new ConcurrentHashMap<>();

8

9     public MasterDemo(WorkerDemo worker, int workerCount) {

10         // 每个worker对象都需要持有queue的引用, 用于领任务与提交结果

11         worker.setResultMap(resultMap);

12         worker.setWorkQueue(workQueue);

13         for (int i = 0; i < workerCount; i++) {

14             workers.put("子节点: " + i, new Thread(worker));

15         }

16     }

17

18     // 提交任务

19     public void submit(TaskDemo task) {

20         workQueue.add(task);

21     }

22

23     // 启动所有的子任务

24     public void execute(){

25         for (Map.Entry entry : workers.entrySet()) {

26             entry.getValue().start();

27         }

28     }

29

30     // 判断所有的任务是否执行结束

31     public boolean isComplete() {

32         for (Map.Entry entry : workers.entrySet()) {

33             if (entry.getValue().getState() != Thread.State.TERMINATED) {

34                 return false;

35             }

36         }

37

38         return true;

39     }

40

41     // 获取最终汇总的结果

42     public int getResult() {

43         int result = 0;

44         for (Map.Entry entry : resultMap.entrySet()) {

45             result += Integer.parseInt(entry.getValue().toString());

46         }

47

48         return result;

49     }

50

51 }

Worker代码

1 public class WorkerDemo implements Runnable{

2

3     private ConcurrentlinkedQueue workQueue;

4     private ConcurrentHashMap resultMap;

5

6     @Override

7     public void run() {

8         while (true) {

9             TaskDemo input = this.workQueue.poll();

10             // 所有任务已经执行完毕

11             if (input == null) {

12                 break;

13             }

14             // 模拟对task进行处理, 返回结果

15             int result = input.getPrice();

16             this.resultMap.put(input.getId() + “”, result);

17             System.out.println("任务执行完毕, 当前线程: " + Thread.currentThread().getName());

18         }

19     }

20

21     public ConcurrentlinkedQueue getWorkQueue() {

22         return workQueue;

23     }

24

25     public void setWorkQueue(ConcurrentlinkedQueue workQueue) {

26         this.workQueue = workQueue;

27     }

28

29     public ConcurrentHashMap getResultMap() {

30         retu

【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】

浏览器打开:qq.cn.hn/FTf 免费领取

rn resultMap;

31     }

32

33     public void setResultMap(ConcurrentHashMap resultMap) {

34         this.resultMap = resultMap;

35     }

36 }

1 public class TaskDemo {

2

3     private int id;

4     private String name;

5     private int price;

6

7     public int getId() {

8         return id;

9     }

10

11     public void setId(int id) {

12         this.id = id;

13     }

14

15     public String getName() {

16         return name;

17     }

18

19     public void setName(String name) {

20         this.name = name;

21     }

22

23     public int getPrice() {

24         return price;

25     }

26

27     public void setPrice(int price) {

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/422601.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号