您使用以下同步技术。
- 具有实时套接字数据的映射位于原子引用的后面,因此可以安全地切换映射。
- 该
updateLiveSockets()
方法是同步的(隐式地),这将防止同时通过两个线程切换映射。 - 如果在
getNextSocket()
方法期间发生切换,则在使用地图时要对地图进行本地引用以避免混淆。
是否像现在一样是线程安全的?
线程安全性始终取决于共享的可变数据上是否存在正确的同步。在这种情况下,共享的可变数据是数据中心到其SocketHolders列表的映射。
地图位于中
AtomicReference,并制作本地副本以供使用的事实足以在地图上实现同步。您的方法采用了地图的一个版本,并使用该版本,由于的性质,切换版本是线程安全的
AtomicReference。只需为地图创建member字段,也可以实现此目的
volatile,因为您要做的只是更新参考(您无需对其进行任何先检查后操作操作)。
由于可以
scheduleAtFixedRate()确保所传递的消息
Runnable不会与自身并发运行,因此不需要
synchronizedon
updateLiveSockets(),但是它也没有任何实质性的危害。
所以是的,该类是线程安全的。
但是,尚不清楚a是否
SocketHolder可以同时被多个线程使用。照原样,此类仅尝试
SocketHolder通过选择一个随机活动对象来最大程度地减少对s的并发使用(尽管无需重新整理整个数组以选择一个随机索引)。它实际上没有阻止并发使用。
可以提高效率吗?
我相信可以。当查看该
updateLiveSockets()方法时,似乎它会构建完全相同的映射,只是
SocketHolders的
isLive标志值可能不同。这使我得出结论,我只想切换地图中的每个列表,而不是切换整个地图。为了以线程安全的方式更改映射中的条目,我可以使用
ConcurrentHashMap。
如果我使用
ConcurrentHashMap,并且不切换地图,而是切换地图中的值,则可以摆脱
AtomicReference。
要更改映射,我可以构建新列表并将其直接放入映射中。这是更有效的,因为我可以更快地发布数据,创建的对象也更少,而同步仅基于现成的组件构建,这有利于提高可读性。
这是我的构建(为简洁起见,省略了一些不太重要的部分)
public class SocketManager { private static final Random random = new Random(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap private final ZContext ctx = new ZContext(); // ... private SocketManager() { connectToZMQSockets(); scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS); } // during startup, making a connection and populate once private void connectToZMQSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH); liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map } } // ... // this method will be called by multiple threads to get the next live socket // is there any concurrency or thread safety issue or race condition here? public Optional<SocketHolder> getNextSocket() { for (Datacenters dc : Datacenters.getOrderedDatacenters()) { Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder> if (liveSocket.isPresent()) { return liveSocket; } } return Optional.absent(); } // is there any concurrency or thread safety issue or race condition here? private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List<SocketHolder> liveonly = new ArrayList<>(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one } } return Optional.absent(); } // no need to make this synchronized private void updateLiveSockets() { Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS; for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); for (SocketHolder liveSocket : liveSockets) { // LINE A Socket socket = liveSocket.getSocket(); String endpoint = liveSocket.getEndpoint(); Map<byte[], byte[]> holder = populateMap(); Message message = new Message(holder, Partition.COMMAND); boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket); boolean isLive = (status) ? true : false; SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); liveUpdatedSockets.add(zmq); } liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner. } }}


