栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

在单个后台线程定期修改地图的同时读取地图

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

在单个后台线程定期修改地图的同时读取地图

您使用以下同步技术。

  1. 具有实时套接字数据的映射位于原子引用的后面,因此可以安全地切换映射。
  2. updateLiveSockets()
    方法是同步的(隐式地),这将防止同时通过两个线程切换映射。
  3. 如果在
    getNextSocket()
    方法期间发生切换,则在使用地图时要对地图进行本地引用以避免混淆。

是否像现在一样是线程安全的?

线程安全性始终取决于共享的可变数据上是否存在正确的同步。在这种情况下,共享的可变数据是数据中心到其SocketHolders列表的映射。

地图位于中

AtomicReference
,并制作本地副本以供使用的事实足以在地图上实现同步。您的方法采用了地图的一个版本,并使用该版本,由于的性质,切换版本是线程安全的
AtomicReference
。只需为地图创建member字段,也可以实现此目的
volatile
,因为您要做的只是更新参考(您无需对其进行任何先检查后操作操作)。

由于可以

scheduleAtFixedRate()
确保所传递的消息
Runnable
不会与自身并发运行,因此不需要
synchronized
on
updateLiveSockets()
,但是它也没有任何实质性的危害。

所以是的,该类是线程安全的。

但是,尚不清楚a是否

SocketHolder
可以同时被多个线程使用。照原样,此类仅尝试
SocketHolder
通过选择一个随机活动对象来最大程度地减少对s的并发使用(尽管无需重新整理整个数组以选择一个随机索引)。它实际上没有阻止并发使用。

可以提高效率吗?

我相信可以。当查看该

updateLiveSockets()
方法时,似乎它会构建完全相同的映射,只是
SocketHolder
s的
isLive
标志值可能不同。这使我得出结论,我只想切换地图中的每个列表,而不是切换整个地图。为了以线程安全的方式更改映射中的条目,我可以使用
ConcurrentHashMap

如果我使用

ConcurrentHashMap
,并且不切换地图,而是切换地图中的值,则可以摆脱
AtomicReference

要更改映射,我可以构建新列表并将其直接放入映射中。这是更有效的,因为我可以更快地发布数据,创建的对象也更少,而同步仅基于现成的组件构建,这有利于提高可读性。

这是我的构建(为简洁起见,省略了一些不太重要的部分)

public class SocketManager {    private static final Random random = new Random();    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap    private final ZContext ctx = new ZContext();    // ...    private SocketManager() {      connectToZMQSockets();      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);    }    // during startup, making a connection and populate once    private void connectToZMQSockets() {      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map      }    }    // ...    // this method will be called by multiple threads to get the next live socket    // is there any concurrency or thread safety issue or race condition here?    public Optional<SocketHolder> getNextSocket() {      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>        if (liveSocket.isPresent()) {          return liveSocket;        }      }      return Optional.absent();    }    // is there any concurrency or thread safety issue or race condition here?    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {      if (!CollectionUtils.isEmpty(listOfEndPoints)) {        // The list of live sockets        List<SocketHolder> liveonly = new ArrayList<>(listOfEndPoints.size());        for (SocketHolder obj : listOfEndPoints) {          if (obj.isLive()) { liveOnly.add(obj);          }        }        if (!liveOnly.isEmpty()) {          // The list is not empty so we shuffle it an return the first element          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one        }      }      return Optional.absent();    }    // no need to make this synchronized    private void updateLiveSockets() {      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();        for (SocketHolder liveSocket : liveSockets) { // LINE A          Socket socket = liveSocket.getSocket();          String endpoint = liveSocket.getEndpoint();          Map<byte[], byte[]> holder = populateMap();          Message message = new Message(holder, Partition.COMMAND);          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);          boolean isLive = (status) ? true : false;          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);          liveUpdatedSockets.add(zmq);        }        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.      }    }}


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/470221.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号