java中的ThreadPoolExecutor 任务去重
时间: 2023-11-21 17:57:11 浏览: 75
Java进阶之ThreadPoolExecutor
ThreadPoolExecutor 中并没有提供任务去重的功能,但是我们可以通过自定义一个任务队列来实现任务去重。具体实现方法如下:
1.自定义一个任务队列,继承自 ArrayBlockingQueue,重写 add() 方法,在添加任务时判断任务是否已经存在于队列中,如果存在则不添加,否则添加。
```java
public class CustomTaskQueue extends ArrayBlockingQueue<Runnable> {
public CustomTaskQueue(int capacity) {
super(capacity);
}
@Override
public boolean add(Runnable runnable) {
if (contains(runnable)) {
return false;
} else {
return super.add(runnable);
}
}
@Override
public boolean offer(Runnable runnable) {
if (contains(runnable)) {
return false;
} else {
return super.offer(runnable);
}
}
}
```
2.自定义一个 ThreadPoolExecutor,重写构造方法,将任务队列替换为自定义的任务队列。
```java
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
public void execute(Runnable command) {
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
@Override
public void shutdown() {
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
@Override
public boolean isShutdown() {
return super.isShutdown();
}
@Override
public boolean isTerminating() {
return super.isTerminating();
}
@Override
public boolean isTerminated() {
return super.isTerminated();
}
@Override
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
super.setRejectedExecutionHandler(handler);
}
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return super.getRejectedExecutionHandler();
}
@Override
public void setThreadFactory(ThreadFactory threadFactory) {
super.setThreadFactory(threadFactory);
}
@Override
public ThreadFactory getThreadFactory() {
return super.getThreadFactory();
}
@Override
public void setCorePoolSize(int corePoolSize) {
super.setCorePoolSize(corePoolSize);
}
@Override
public int getCorePoolSize() {
return super.getCorePoolSize();
}
@Override
public boolean prestartCoreThread() {
return super.prestartCoreThread();
}
@Override
public int prestartAllCoreThreads() {
return super.prestartAllCoreThreads();
}
@Override
public boolean allowsCoreThreadTimeOut() {
return super.allowsCoreThreadTimeOut();
}
@Override
public void allowCoreThreadTimeOut(boolean value) {
super.allowCoreThreadTimeOut(value);
}
@Override
public void setMaximumPoolSize(int maximumPoolSize) {
super.setMaximumPoolSize(maximumPoolSize);
}
@Override
public int getMaximumPoolSize() {
return super.getMaximumPoolSize();
}
@Override
public void setKeepAliveTime(long time, TimeUnit unit) {
super.setKeepAliveTime(time, unit);
}
@Override
public long getKeepAliveTime(TimeUnit unit) {
return super.getKeepAliveTime(unit);
}
@Override
public BlockingQueue<Runnable> getQueue() {
return super.getQueue();
}
@Override
public boolean remove(Runnable task) {
return super.remove(task);
}
@Override
public void purge() {
super.purge();
}
@Override
public int getPoolSize() {
return super.getPoolSize();
}
@Override
public int getActiveCount() {
return super.getActiveCount();
}
@Override
public int getLargestPoolSize() {
return super.getLargestPoolSize();
}
@Override
public long getTaskCount() {
return super.getTaskCount();
}
@Override
public long getCompletedTaskCount() {
return super.getCompletedTaskCount();
}
}
```
3.使用自定义的 ThreadPoolExecutor 创建线程池。
```java
CustomTaskQueue taskQueue = new CustomTaskQueue(10);
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, taskQueue);
```
阅读全文