package com.qianwen.smartman.common.utils; import cn.hutool.core.date.DateUtil; import com.google.common.collect.Queues; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import com.qianwen.smartman.common.constant.DateConstant; public class DelayList { private final int maxWait; private final int maxLen; private BlockingQueue queue; private Consumer> consumer; private Thread timer; public void setQueue(final BlockingQueue queue) { this.queue = queue; } public void setConsumer(final Consumer> consumer) { this.consumer = consumer; } public void setTimer(final Thread timer) { this.timer = timer; } public boolean equals(final Object o) { if (o == this) { return true; } if (o instanceof DelayList) { DelayList other = (DelayList) o; if (other.canEqual(this) && getMaxWait() == other.getMaxWait() && getMaxLen() == other.getMaxLen()) { Object this$queue = getQueue(); Object other$queue = other.getQueue(); if (this$queue == null) { if (other$queue != null) { return false; } } else if (!this$queue.equals(other$queue)) { return false; } Object this$consumer = getConsumer(); Object other$consumer = other.getConsumer(); if (this$consumer == null) { if (other$consumer != null) { return false; } } else if (!this$consumer.equals(other$consumer)) { return false; } Object this$timer = getTimer(); Object other$timer = other.getTimer(); return this$timer == null ? other$timer == null : this$timer.equals(other$timer); } return false; } return false; } protected boolean canEqual(final Object other) { return other instanceof DelayList; } public int hashCode() { int result = (1 * 59) + getMaxWait(); int result2 = (result * 59) + getMaxLen(); Object $queue = getQueue(); int result3 = (result2 * 59) + ($queue == null ? 43 : $queue.hashCode()); Object $consumer = getConsumer(); int result4 = (result3 * 59) + ($consumer == null ? 43 : $consumer.hashCode()); Object $timer = getTimer(); return (result4 * 59) + ($timer == null ? 43 : $timer.hashCode()); } public String toString() { return "DelayList(maxWait=" + getMaxWait() + ", maxLen=" + getMaxLen() + ", queue=" + getQueue() + ", consumer=" + getConsumer() + ", timer=" + getTimer() + ")"; } public int getMaxWait() { return this.maxWait; } public int getMaxLen() { return this.maxLen; } public BlockingQueue getQueue() { return this.queue; } public Consumer> getConsumer() { return this.consumer; } public Thread getTimer() { return this.timer; } public DelayList(int maxWait) { this.queue = new ArrayBlockingQueue<>(100000, true); this.maxWait = maxWait; this.maxLen = 1000; } public DelayList(int maxWait, int maxLen) { this.queue = new ArrayBlockingQueue<>(100000, true); this.maxWait = maxWait; this.maxLen = maxLen; } public synchronized void add(T t) { add(t, this.consumer); } public synchronized void add(T t, Consumer> consumer) { this.queue.add(t); if (this.timer == null) { System.out.println("创建消费线程"); this.timer = new Thread(() -> { ArrayList arrayList = new ArrayList<>(); Queues.drainUninterruptibly(this.queue, arrayList, this.maxLen, this.maxWait, TimeUnit.SECONDS); if (arrayList.size() != 0) { consumer.accept(arrayList); this.timer.run(); return; } this.timer.interrupt(); this.timer = null; System.out.println("结束消费线程"); }); this.timer.start(); } } public static void main(String[] args) throws InterruptedException { DelayList delayList = new DelayList<>(10, 2000); for (int i = 0; i < 30000; i++) { Thread.sleep(1L); delayList.add(Integer.valueOf(i), list -> { System.out.println("批量执行" + DateUtil.format(new Date(), DateConstant.PATTERN_DATE_TIME) + " : " + list.size()); }); } } }