飞道的博客

大容量列表如何支持元素快速存取、删除的同时支持高并发遍历

348人阅读  评论(0)

大容量列表如何支持元素快速存取、删除的同时支持高并发遍历

问题分析

当我们拿到一个大容量列表,如果需要进行快速存取和删除元素操作,毋庸置疑需要使用HashSet容器,但如果还需要支持遍历操作则需要考虑不能在容器本身上进行遍历,因为在遍历时如果发生了元素的删除或加入操作则会导致遍历失败,那么需要将原容器复制出一份进行遍历,但是在高并发情况下,如果每次遍历都需要将原容器复制一份,一是大容量列表复制需要循环很多次才能复制成功,消耗cpu,另外复制过程中还需要加锁,防止复制过程中元素增减导致复制失败,这样程序整个性能肯定达不到要求。

解决思路

整个问题中最关键的点是遍历的问题,因为按正常方式遍历需要消耗的资源和时间是最多的,如果每次遍历时不需要复制、不需要加锁那么问题就解决了,这里目前能想到的方法是对原容器做冗余,在每次原容器变化时冗余一份,我们在冗余的这份数据进行遍历,但是如果不对冗余的容器对象进行管理很容易到内存泄漏,因为容器里的元素生命周期是比较长的,会导致最后一次元素修改之前生成的冗余容器无法回收,那这里的问题就转换为对冗余容器的管理。

因为遍历操作可能同时在多个线程进行,当加入或删除元素时需要将正在使用的列表清空保证能回收,如果等待所有线程遍历结束再清理,一是需要知道遍历何时结束,另外是需要在等待的时候不能有其它线程进行元素增删操作,但这样会导致性能严重下降,所以这里需要一个不需要等待的解决方法,解决方式是使用多版本进行控制,每一次获取元素列表时记录该列表的引用次数,遍历操作后都需要调用归还方法进行归还,但归还时该版本的元素列表可能已经过期,如果已经过期且引用数为0则清理该列表,否则将该列表标记为已过期版本,等待最后一个使用线程归还后进行清理。

解决步骤

使用一个set存储元素,支持快速存取和删除

/**
	 * 保存元素的集合容器
	 */
	private final Set<E> E_SET = Collections.newSetFromMap(new ConcurrentHashMap<>());

使用一个list元素,用于支持遍历操作(这里使用了一个继承ArrayList的自定义类,后面会有解释)

/**
	 * 返回用于遍历的列表容器
	 */
	private volatile CompareList<E> ITER_LIST = new CompareList<>(0);

使用一个队列记录需要清理元素的列表,当一个列表不再使用时放入该队列,由异步线程进行清理,保证高效率

/**
	 * 等待清理的列表容器
	 */
	private LinkedBlockingDeque<CompareList<E>> WAIT_CLOSE_QUEUE = new LinkedBlockingDeque<>(); 

线程监听队列

/**
	 * 开启监听待清理列表线程,当拿到列表时将元素清空
	 */
	private void startClearThread() {
		
		Thread thread = new Thread(new Runnable() {

			@Override
			public void run() {
				
				CompareList<E> list = null;
				try {
					while(null != (list = WAIT_CLOSE_QUEUE.take())) {
						// 清除列表元素
						list.clear();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
			
		});
		thread.setDaemon(true);
		thread.start();
	}

为了能记录历史需要清理的列表,这里使用继承ArrayList的自定义类,其中添加两个属性

	// 记录当前引用次数
	private AtomicInteger quoteCount = new AtomicInteger(0);
	
	/**
	 * 是否已过期,true则表示已过期
	 */
	volatile boolean expire = false;

使用一个读写锁控制在元素增删时发生的复制操作(获取写锁)和获取遍历列表操作(获取读锁)

/**
	 * 当需要更新遍历列表时需要获取写锁,需要获取遍历列表时需要获取读锁
	 */
	private ReentrantReadWriteLock SET_GET_LOCK = new ReentrantReadWriteLock();

元素更新时获取写锁后进行更新,保证复制操作不被打断

/**增加元素并更新用于遍历的列表
	 * @param c
	 * @throws InterruptedException
	 */
	private void doAddE(Collection<? extends E> c) throws InterruptedException {
		if(null == c || c.isEmpty()) {
			return;
		}
		
		WriteLock writeLock = SET_GET_LOCK.writeLock();
		writeLock.lockInterruptibly();
		try {
			E_SET.addAll(c);
			
			newInterList();
		} finally {
			writeLock.unlock();
		}
	}

更新用于遍历的列表时如果旧列表未被引用则直接添加到待清理队列否则标记未待归还后清理

	/**
	 * 创建一个新的遍历列表并按情况处理旧列表
	 */
	private void newInterList() {
		
		if(!ITER_LIST.quoteCountZero()) {
			// 如果引用数不为0则添加到待归还后清理
			ITER_LIST.expire = true;
		} else {
			// 没有引用则直接添加到待清理队列
			WAIT_CLOSE_QUEUE.push(ITER_LIST);
		}
		
		ITER_LIST = new CompareList<>(E_SET);
	}

列表归还方法,判断是否还有引用以及是否已过期

/**使用完毕后归还列表
	 * @param interList
	 * @throws InterruptedException 
	 */
	public void backList(CompareList<E> interList) throws InterruptedException {
		
		ReadLock readLock = SET_GET_LOCK.readLock();
		readLock.lockInterruptibly();
		try {
			interList.incAndGetQuoteCount(-1);
			boolean zero = interList.quoteCountZero();
			boolean expire = interList.expire;
			if(zero && expire) {
				WAIT_CLOSE_QUEUE.push(interList);
			}
			
		} finally {
			readLock.unlock();
		}
		
	}

通过上面的步骤解决了大容量列表支持快速存取、删除的同时还支持同时很多线程对其进行遍历。如果大家还有更好的方法欢迎指教。

完整代码

package javaxx;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

public class BigList <E> {
	
	/**
	 * 保存元素的集合容器
	 */
	private final Set<E> E_SET = Collections.newSetFromMap(new ConcurrentHashMap<>());
	
	/**
	 * 返回用于遍历的列表容器
	 */
	private volatile CompareList<E> ITER_LIST = new CompareList<>(0);
	
	/**
	 * 等待清理的列表容器
	 */
	private LinkedBlockingDeque<CompareList<E>> WAIT_CLOSE_QUEUE = new LinkedBlockingDeque<>(); 
	
	/**
	 * 当需要更新遍历列表时需要获取写锁,需要获取遍历列表时需要获取读锁
	 */
	private ReentrantReadWriteLock SET_GET_LOCK = new ReentrantReadWriteLock();
	
	public BigList() {
		startClearThread();
	}
	
	public BigList(Collection<? extends E> c) throws InterruptedException {
		this();
		addALLE(c);
	}
	
	/**获取遍历的列表
	 * @return
	 * @throws InterruptedException
	 */
	public CompareList<E> getList() throws InterruptedException {
		
		ReadLock readLock = SET_GET_LOCK.readLock();
		readLock.lockInterruptibly();
		try {
			ITER_LIST.incAndGetQuoteCount(1);
			return ITER_LIST;
		} finally {
			readLock.unlock();
		}
		
	}
	
	/**
	 * @param interList
	 * @throws InterruptedException 
	 */
	public void backList(CompareList<E> interList) throws InterruptedException {
		
		ReadLock readLock = SET_GET_LOCK.readLock();
		readLock.lockInterruptibly();
		try {
			interList.incAndGetQuoteCount(-1);
			boolean zero = interList.quoteCountZero();
			boolean expire = interList.expire;
			if(zero && expire) {
				WAIT_CLOSE_QUEUE.push(interList);
			}
			
		} finally {
			readLock.unlock();
		}
		
	}
	
	/**增加多个元素
	 * @param c
	 * @throws InterruptedException
	 */
	public void addALLE(Collection<? extends E> c) throws InterruptedException {
		doAddE(c);
	}
	
	/**增加单个元素
	 * @param e
	 * @throws InterruptedException
	 */
	public void addE(E e) throws InterruptedException {
		doAddE(Arrays.asList(e));
	}
	
	/**删除多个元素
	 * @param c
	 * @throws InterruptedException
	 */
	public void removeAllE(Collection<? extends E> c) throws InterruptedException {
		doRemoveE(c);
	}
	
	/**删除单个元素
	 * @param e
	 * @throws InterruptedException
	 */
	public void removeE(E e) throws InterruptedException {
		doRemoveE(Arrays.asList(e));
	}
	
	/**是否包含给定的所有元素
	 * @param c
	 * @return
	 */
	public boolean containsAll(Collection<? extends E> c) {
		return E_SET.containsAll(c);
	}
	
	/**是否包含给定的元素
	 * @param e
	 * @return
	 */
	public boolean contains(E e) {
		return E_SET.contains(e);
	}
	
	/**增加元素并更新用于遍历的列表
	 * @param c
	 * @throws InterruptedException
	 */
	private void doAddE(Collection<? extends E> c) throws InterruptedException {
		if(null == c || c.isEmpty()) {
			return;
		}
		
		WriteLock writeLock = SET_GET_LOCK.writeLock();
		writeLock.lockInterruptibly();
		try {
			E_SET.addAll(c);
			
			newInterList();
		} finally {
			writeLock.unlock();
		}
	}
	
	/**移除元素并更新用于遍历的列表
	 * @param c
	 * @throws InterruptedException
	 */
	private void doRemoveE(Collection<? extends E> c) throws InterruptedException {
		if(null == c || c.isEmpty()) {
			return;
		}
		
		WriteLock writeLock = SET_GET_LOCK.writeLock();
		writeLock.lockInterruptibly();
		try {
			E_SET.removeAll(c);
			
			newInterList();
		} finally {
			writeLock.unlock();
		}
	}
	
	/**
	 * 创建一个新的遍历列表并按情况处理旧列表
	 */
	private void newInterList() {
		
		if(!ITER_LIST.quoteCountZero()) {
			// 如果引用数不为0则添加到待归还后清理
			ITER_LIST.expire = true;
		} else {
			// 没有引用则直接添加到待清理队列
			WAIT_CLOSE_QUEUE.push(ITER_LIST);
		}
		
		ITER_LIST = new CompareList<>(E_SET);
	}
	
	/**
	 * 开启监听待清理列表线程,当拿到列表时将元素清空
	 */
	private void startClearThread() {
		
		Thread thread = new Thread(new Runnable() {

			@Override
			public void run() {
				
				CompareList<E> list = null;
				try {
					while(null != (list = WAIT_CLOSE_QUEUE.take())) {
						// 清除列表元素
						list.clear();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
			
		});
		thread.setDaemon(true);
		thread.start();
	}
	
}

/** 重新实现equals方法和hashCode方法,方便将列表实例放入Hash容器中进行存取
 * @author rhc
 *
 * @param <E>
 */
class CompareList <E> extends ArrayList<E> {
	
	private AtomicInteger quoteCount = new AtomicInteger(0);
	
	/**
	 * 是否已过期,true则表示已过期
	 */
	volatile boolean expire = false;
	
	/**
     * Constructs an empty list with the specified initial capacity.
     *
     * @param  initialCapacity  the initial capacity of the list
     * @throws IllegalArgumentException if the specified initial capacity
     *         is negative
     */
    public CompareList(int initialCapacity) {
        super(initialCapacity);
    }

    /**
     * Constructs an empty list with an initial capacity of ten.
     */
    public CompareList() {
        super();
    }

    /**
     * Constructs a list containing the elements of the specified
     * collection, in the order they are returned by the collection's
     * iterator.
     *
     * @param c the collection whose elements are to be placed into this list
     * @throws NullPointerException if the specified collection is null
     */
    public CompareList(Collection<? extends E> c) {
        super(c);
    }
    
    /**增加引用数并返回增加后的引用数
     * @param inc 增加的引用数
     * @return
     */
    public int incAndGetQuoteCount(int inc) {
    	return quoteCount.addAndGet(inc);
    }
    
    /**获取当前引用数
     * @return
     */
    public int getQuoteCount() {
    	return quoteCount.get();
    }
    
    /**当前引用数是否为0
     * @return
     */
    public boolean quoteCountZero() {
    	return 0 == quoteCount.get();
    }
    
}


转载:https://blog.csdn.net/veriloglogo/article/details/105603243
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场