我有两个类
类一WebsocketSessionCache是一个单例缓存,用于保存一些key,value信息。
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class WebsocketSessionCache {
private static final Logger logger = LogManager.getLogger();
private ListMultimap<String, Integer> cache = null;
private ScheduledExecutorService executorService = null;
// private final Object lock = new Object();
private static ReentrantLock lock = new ReentrantLock();
private WebsocketSessionCache() {
logger.debug("构造WebsocketSession缓存");
cache = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
logger.debug("开启定时清理线程");
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(createClearThread(), 0, 10, TimeUnit.SECONDS);
}
static class WebsocketSessionCacheHolder {
static WebsocketSessionCache sessionCache = new WebsocketSessionCache();
}
public static WebsocketSessionCache getInstance() {
return WebsocketSessionCacheHolder.sessionCache;
}
public void put(String key, Integer session) {
try {
lock.lock();
cache.put(key, session);
} catch (Exception e) {
logger.error("写入会话异常 e = {}", e);
} finally {
lock.unlock();
}
}
public void remove(String key, Integer session) {
try {
// lock.lock();
cache.remove(key, session);
} catch (Exception e) {
logger.error("清理会话异常 e = {}", e);
} finally {
// lock.lock();
}
}
public ListMultimap<String, Integer> getOpenSessions() {
ListMultimap<String, Integer> openSession = ArrayListMultimap.create();
try {
WebsocketSessionCache.lock.lock();
for (Map.Entry<String, Integer> e : WebsocketSessionCache.getInstance().cache.entries()) {
if (e.getValue() == 2)
openSession.put(e.getKey(), e.getValue());
}
} catch (Exception e) {
logger.error("读取Open会话异常 e = {}", e);
} finally {
WebsocketSessionCache.lock.unlock();
}
return openSession;
}
public void clear() {
try {
logger.debug("清理会话线程 time = {}", System.currentTimeMillis());
//在缓存中移除无效的会话
Thread.currentThread().setName("createClearThread");
lock.lock();
Iterator<Map.Entry<String, Integer> > iterator = cache.entries().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Integer> e = iterator.next();
if (e.getValue() != 2) {
logger.debug("清理会话[key={},sessionId={}]", e.getKey(), e.getValue());
remove(e.getKey(), e.getValue());
}
}
} catch (Exception e) {
logger.error("清理会话线程异常 e = {}", e);
} finally {
lock.unlock();
}
}
public void destroy() {
cache.clear();
executorService.shutdownNow();
}
private Runnable createClearThread() {
Runnable t = new Runnable() {
public void run() {
clear();
}
};
return t;
}
}
另一个类
public class Test {
public static void main(String str[]) {
SendHotelRealtimeDataTake sendHotelRealtimeDataTake = new SendHotelRealtimeDataTake();
new Thread(() -> {
while (true)
for (int i = 0; i < 5; i++) {
System.out.println("main run ...");
WebsocketSessionCache.WebsocketSessionCacheHolder.sessionCache.put(String.valueOf(i), i);
try {
Thread.currentThread().setName("test main");
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
往缓存类中put数据。
现在出现问题就是在缓存中的clear()方法中,我已经加了锁,为什么还报并发异常。
清理会话线程异常 e = {} java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
at java.util.ArrayList$Itr.next(ArrayList.java:851)
at com.google.common.collect.AbstractMapBasedMultimap$Itr.next(AbstractMapBasedMultimap.java:1150)
at com.sensetime.ad.sensefocus.websocket.WebsocketSessionCache.clear(WebsocketSessionCache.java:101)
at com.sensetime.ad.sensefocus.websocket.WebsocketSessionCache$1.run(WebsocketSessionCache.java:125)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)