为了线程安全,您的代码必须将对所有共享可变状态的任何访问同步。
在这里,您共享了的 非线程安全*
实现liveSocketsByDatacenter
的实例,可以潜在地同时读取(通过和)和修改(通过和)而无需同步任何足以使您的代码非线程安全的访问。此外,此值是a的
非线程安全 实现的实例,也可以潜在地同时读取(由和)和修改(由精确地由)。HashMap
Map``updateLiveSockets``getNextSocket``connectToZMQSockets``updateLiveSockets``Map``ArrayList
***
List``getNextSocket``updateLiveSockets``getLiveSocket``Collections.shuffle
解决您的2个线程安全问题的简单方法是:
- 对变量使用a
ConcurrentHashMap
而不是aHashMap
,liveSocketsByDatacenter
因为它是的本机线程安全实现Map
。 - 使用将实例的 不可修改 版本
ArrayList
作为地图的值Collections.unmodifiableList(List<? extends T> list)
,您的列表将是不可变的,因此线程安全。
例如:
liveSocketsByDatacenter.put( entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));`
- 重写您的方法
getLiveSocket
以避免Collections.shuffle
直接在列表上调用,例如,您可以仅对活动套接字列表进行洗牌,而不是对所有套接字进行洗牌,或者使用列表的副本(例如使用new ArrayList<>(listOfEndPoints)
)代替列表本身。
例如:
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { if (!CollectionUtils.isEmpty(listOfEndPoints)) { // The list of live sockets List<SocketHolder> liveonly = new ArrayList<>(listOfEndPoints.size()); for (SocketHolder obj : listOfEndPoints) { if (obj.isLive()) { liveOnly.add(obj); } } if (!liveOnly.isEmpty()) { // The list is not empty so we shuffle it an return the first element Collections.shuffle(liveOnly); return Optional.of(liveOnly.get(0)); } } return Optional.absent();}对于您似乎经常阅读并且很少(每30秒仅一次)修改地图的#1,您可以考虑重建地图,然后
Collections.unmodifiableMap(Map<?extends K,? extends V>m)每30秒共享其不可变版本(使用),这种方法在大多数情况下是非常有效的因为您不再为访问地图内容付出任何同步机制的代价。
您的代码将是:
// Your variable is no more final, it is now volatile to ensure that all // threads will see the same thing at all time by getting it from// the main memory instead of the CPU cacheprivate volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = Collections.unmodifiableMap(new HashMap<>());private void connectToZMQSockets() { Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // The map in which I put all the live sockets Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>(); for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> addedColoSockets = connect( entry.getKey(), entry.getValue(), ZMQ.PUSH ); liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets)); } // Set the new content of my map as an unmodifiable map this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets);}public Optional<SocketHolder> getNextSocket() { // For the sake of consistency make sure to use the same map instance // in the whole implementation of my method by getting my entries // from the local variable instead of the member variable Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = this.liveSocketsByDatacenter; ...}...// Added the modifier synchronized to prevent concurrent modification// it is needed because to build the new map we first need to get the// old one so both must be done atomically to prevent concistency issuesprivate synchronized void updateLiveSockets() { // Initialize my new map with the current map content Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(this.liveSocketsByDatacenter); Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; // The map in which I put all the live sockets Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>(); for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { ... liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); } // Set the new content of my map as an unmodifiable map this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter);}您的字段
liveSocketsByDatacenter也可以是类型
AtomicReference<Map<Datacenters,List<SocketHolder>>>,那么
final您的地图仍将存储在
volatile变量中,但在类内
AtomicReference。
之前的代码将是:
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));...private void connectToZMQSockets() { ... // Update the map content this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets));}public Optional<SocketHolder> getNextSocket() { // For the sake of consistency make sure to use the same map instance // in the whole implementation of my method by getting my entries // from the local variable instead of the member variable Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = this.liveSocketsByDatacenter.get(); ...}// Added the modifier synchronized to prevent concurrent modification// it is needed because to build the new map we first need to get the// old one so both must be done atomically to prevent concistency issuesprivate synchronized void updateLiveSockets() { // Initialize my new map with the current map content Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(this.liveSocketsByDatacenter.get()); ... // Update the map content this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));}


