支持线程安全、任务的取消的TimingWheel java实现方式
时间: 2024-05-06 15:15:52 浏览: 135
TimingWheel是常用的一种时间轮算法,它可以用来实现延时任务或者定时任务的调度。在Java中,我们可以使用ScheduledExecutorService来实现定时任务,但是它不支持任务的取消和线程安全。因此,我们可以通过自己实现一个TimingWheel来满足这些需求。
以下是一个基于Java的线程安全的TimingWheel实现方式:
```java
import java.util.concurrent.*;
public class TimingWheel {
private final int tickMs;
private final int wheelSize;
private final long startMs;
private final TaskList[] wheel;
private final Executor executor;
private final ScheduledExecutorService timer;
private final Semaphore semaphore = new Semaphore(1);
private long currentTimeMs;
private int currentTickIndex;
public TimingWheel(int tickMs, int wheelSize, long startMs, Executor executor) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.startMs = startMs;
this.wheel = new TaskList[wheelSize];
for (int i = 0; i < wheelSize; ++i) {
wheel[i] = new TaskList();
}
this.executor = executor;
this.timer = Executors.newSingleThreadScheduledExecutor();
this.currentTimeMs = startMs - (startMs % tickMs);
this.currentTickIndex = 0;
}
public void addTask(Task task) {
long expirationMs = task.getExpirationMs();
if (expirationMs < currentTimeMs + tickMs) {
executor.execute(task);
} else {
if (expirationMs > startMs + tickMs * wheelSize) {
throw new IllegalArgumentException("Expiration " + expirationMs + " is out of bounds");
}
int idx = (int) ((expirationMs / tickMs - startMs / tickMs) % wheelSize);
TaskList list = wheel[idx];
list.add(task);
}
}
public void start() {
timer.scheduleAtFixedRate(() -> {
try {
semaphore.acquire();
advanceClock();
tick();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
}, tickMs, tickMs, TimeUnit.MILLISECONDS);
}
private void advanceClock() {
currentTimeMs += tickMs;
currentTickIndex = (int) (currentTimeMs / tickMs % wheelSize);
}
private void tick() {
TaskList list = wheel[currentTickIndex];
list.flush(executor);
list.clear();
}
public void shutdown() {
timer.shutdown();
executor.execute(() -> {
for (TaskList list : wheel) {
list.flush(executor);
list.clear();
}
});
}
private static class TaskList {
private final ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>();
public void add(Task task) {
taskQueue.add(task);
}
public void flush(Executor executor) {
Task task;
while ((task = taskQueue.poll()) != null) {
if (!task.isCancelled()) {
executor.execute(task);
}
}
}
public void clear() {
taskQueue.clear();
}
}
public interface Task extends Runnable {
long getExpirationMs();
boolean isCancelled();
}
}
```
在上面的实现中,我们使用了Semaphore来保证线程安全,使用ScheduledExecutorService来实现定时任务,并且支持任务的取消。当需要添加任务时,我们首先判断任务的过期时间是否在当前时间轮的范围内,如果是,则直接执行任务;如果不是,则将任务添加到对应的槽中。每个槽都是一个TaskList,它是一个线程安全的ConcurrentLinkedQueue。每一次tick时,我们会刷新当前槽中的所有任务,并将其清空。
使用示例:
```java
public class TimingWheelDemo {
public static void main(String[] args) throws InterruptedException {
Executor executor = Executors.newFixedThreadPool(10);
TimingWheel timingWheel = new TimingWheel(100, 20, System.currentTimeMillis(), executor);
timingWheel.start();
for (int i = 0; i < 100; i++) {
timingWheel.addTask(new Task(i * 1000 + System.currentTimeMillis()) {
@Override
public void run() {
System.out.println("Task " + this.getExpirationMs() + " is running");
}
@Override
public boolean isCancelled() {
return false;
}
});
}
Thread.sleep(30000);
timingWheel.shutdown();
((ExecutorService) executor).shutdown();
}
private static abstract class Task implements TimingWheel.Task {
private final long expirationMs;
Task(long expirationMs) {
this.expirationMs = expirationMs;
}
@Override
public long getExpirationMs() {
return expirationMs;
}
}
}
```
上面的示例中,我们创建一个TimingWheel实例,然后添加了100个任务。每个任务的过期时间依次增加1000毫秒,并在定时任务执行时打印出过期时间。我们使用了一个简单的抽象类Task来实现TimingWheel.Task接口,其中getExpirationMs方法返回过期时间。当然,这里的任务并没有实现取消操作,如果需要支持取消,可以在Task中添加一个isCancelled方法来判断任务是否已取消。
阅读全文