package com.qianwen.smartman.common.utils;
|
|
import cn.hutool.core.date.DateUtil;
|
import com.google.common.collect.Queues;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.TimeUnit;
|
import java.util.function.Consumer;
|
import com.qianwen.smartman.common.constant.DateConstant;
|
|
public class DelayList<T> {
|
private final int maxWait;
|
private final int maxLen;
|
private BlockingQueue<T> queue;
|
private Consumer<List<T>> consumer;
|
private Thread timer;
|
|
public void setQueue(final BlockingQueue<T> queue) {
|
this.queue = queue;
|
}
|
|
public void setConsumer(final Consumer<List<T>> consumer) {
|
this.consumer = consumer;
|
}
|
|
public void setTimer(final Thread timer) {
|
this.timer = timer;
|
}
|
|
public boolean equals(final Object o) {
|
if (o == this) {
|
return true;
|
}
|
if (o instanceof DelayList) {
|
DelayList<?> other = (DelayList<?>) o;
|
if (other.canEqual(this) && getMaxWait() == other.getMaxWait() && getMaxLen() == other.getMaxLen()) {
|
Object this$queue = getQueue();
|
Object other$queue = other.getQueue();
|
if (this$queue == null) {
|
if (other$queue != null) {
|
return false;
|
}
|
} else if (!this$queue.equals(other$queue)) {
|
return false;
|
}
|
Object this$consumer = getConsumer();
|
Object other$consumer = other.getConsumer();
|
if (this$consumer == null) {
|
if (other$consumer != null) {
|
return false;
|
}
|
} else if (!this$consumer.equals(other$consumer)) {
|
return false;
|
}
|
Object this$timer = getTimer();
|
Object other$timer = other.getTimer();
|
return this$timer == null ? other$timer == null : this$timer.equals(other$timer);
|
}
|
return false;
|
}
|
return false;
|
}
|
|
protected boolean canEqual(final Object other) {
|
return other instanceof DelayList;
|
}
|
|
public int hashCode() {
|
int result = (1 * 59) + getMaxWait();
|
int result2 = (result * 59) + getMaxLen();
|
Object $queue = getQueue();
|
int result3 = (result2 * 59) + ($queue == null ? 43 : $queue.hashCode());
|
Object $consumer = getConsumer();
|
int result4 = (result3 * 59) + ($consumer == null ? 43 : $consumer.hashCode());
|
Object $timer = getTimer();
|
return (result4 * 59) + ($timer == null ? 43 : $timer.hashCode());
|
}
|
|
public String toString() {
|
return "DelayList(maxWait=" + getMaxWait() + ", maxLen=" + getMaxLen() + ", queue=" + getQueue() + ", consumer=" + getConsumer() + ", timer=" + getTimer() + ")";
|
}
|
|
public int getMaxWait() {
|
return this.maxWait;
|
}
|
|
public int getMaxLen() {
|
return this.maxLen;
|
}
|
|
public BlockingQueue<T> getQueue() {
|
return this.queue;
|
}
|
|
public Consumer<List<T>> getConsumer() {
|
return this.consumer;
|
}
|
|
public Thread getTimer() {
|
return this.timer;
|
}
|
|
public DelayList(int maxWait) {
|
this.queue = new ArrayBlockingQueue<>(100000, true);
|
this.maxWait = maxWait;
|
this.maxLen = 1000;
|
}
|
|
public DelayList(int maxWait, int maxLen) {
|
this.queue = new ArrayBlockingQueue<>(100000, true);
|
this.maxWait = maxWait;
|
this.maxLen = maxLen;
|
}
|
|
public synchronized void add(T t) {
|
add(t, this.consumer);
|
}
|
|
public synchronized void add(T t, Consumer<List<T>> consumer) {
|
this.queue.add(t);
|
if (this.timer == null) {
|
System.out.println("创建消费线程");
|
this.timer = new Thread(() -> {
|
ArrayList<T> arrayList = new ArrayList<>();
|
Queues.drainUninterruptibly(this.queue, arrayList, this.maxLen, this.maxWait, TimeUnit.SECONDS);
|
if (arrayList.size() != 0) {
|
consumer.accept(arrayList);
|
this.timer.run();
|
return;
|
}
|
this.timer.interrupt();
|
this.timer = null;
|
System.out.println("结束消费线程");
|
});
|
this.timer.start();
|
}
|
}
|
|
public static void main(String[] args) throws InterruptedException {
|
DelayList<Integer> delayList = new DelayList<>(10, 2000);
|
for (int i = 0; i < 30000; i++) {
|
Thread.sleep(1L);
|
delayList.add(Integer.valueOf(i), list -> {
|
System.out.println("批量执行" + DateUtil.format(new Date(), DateConstant.PATTERN_DATE_TIME) + " : " + list.size());
|
});
|
}
|
}
|
}
|