yangys
2024-10-30 25db770e621f1259b8d5b7fd514207f7481c2d0f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.qianwen.smartman.common.utils;
 
import cn.hutool.core.date.DateUtil;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.qianwen.smartman.common.constant.DateConstant;
 
public class DelayList<T> {
    private final int maxWait;
    private final int maxLen;
    private BlockingQueue<T> queue;
    private Consumer<List<T>> consumer;
    private Thread timer;
 
    public void setQueue(final BlockingQueue<T> queue) {
        this.queue = queue;
    }
 
    public void setConsumer(final Consumer<List<T>> consumer) {
        this.consumer = consumer;
    }
 
    public void setTimer(final Thread timer) {
        this.timer = timer;
    }
 
    public boolean equals(final Object o) {
        if (o == this) {
            return true;
        }
        if (o instanceof DelayList) {
            DelayList<?> other = (DelayList<?>) o;
            if (other.canEqual(this) && getMaxWait() == other.getMaxWait() && getMaxLen() == other.getMaxLen()) {
                Object this$queue = getQueue();
                Object other$queue = other.getQueue();
                if (this$queue == null) {
                    if (other$queue != null) {
                        return false;
                    }
                } else if (!this$queue.equals(other$queue)) {
                    return false;
                }
                Object this$consumer = getConsumer();
                Object other$consumer = other.getConsumer();
                if (this$consumer == null) {
                    if (other$consumer != null) {
                        return false;
                    }
                } else if (!this$consumer.equals(other$consumer)) {
                    return false;
                }
                Object this$timer = getTimer();
                Object other$timer = other.getTimer();
                return this$timer == null ? other$timer == null : this$timer.equals(other$timer);
            }
            return false;
        }
        return false;
    }
 
    protected boolean canEqual(final Object other) {
        return other instanceof DelayList;
    }
 
    public int hashCode() {
        int result = (1 * 59) + getMaxWait();
        int result2 = (result * 59) + getMaxLen();
        Object $queue = getQueue();
        int result3 = (result2 * 59) + ($queue == null ? 43 : $queue.hashCode());
        Object $consumer = getConsumer();
        int result4 = (result3 * 59) + ($consumer == null ? 43 : $consumer.hashCode());
        Object $timer = getTimer();
        return (result4 * 59) + ($timer == null ? 43 : $timer.hashCode());
    }
 
    public String toString() {
        return "DelayList(maxWait=" + getMaxWait() + ", maxLen=" + getMaxLen() + ", queue=" + getQueue() + ", consumer=" + getConsumer() + ", timer=" + getTimer() + ")";
    }
 
    public int getMaxWait() {
        return this.maxWait;
    }
 
    public int getMaxLen() {
        return this.maxLen;
    }
 
    public BlockingQueue<T> getQueue() {
        return this.queue;
    }
 
    public Consumer<List<T>> getConsumer() {
        return this.consumer;
    }
 
    public Thread getTimer() {
        return this.timer;
    }
 
    public DelayList(int maxWait) {
        this.queue = new ArrayBlockingQueue<>(100000, true);
        this.maxWait = maxWait;
        this.maxLen = 1000;
    }
 
    public DelayList(int maxWait, int maxLen) {
        this.queue = new ArrayBlockingQueue<>(100000, true);
        this.maxWait = maxWait;
        this.maxLen = maxLen;
    }
 
    public synchronized void add(T t) {
        add(t, this.consumer);
    }
 
    public synchronized void add(T t, Consumer<List<T>> consumer) {
        this.queue.add(t);
        if (this.timer == null) {
            System.out.println("创建消费线程");
            this.timer = new Thread(() -> {
                ArrayList<T> arrayList = new ArrayList<>();
                Queues.drainUninterruptibly(this.queue, arrayList, this.maxLen, this.maxWait, TimeUnit.SECONDS);
                if (arrayList.size() != 0) {
                    consumer.accept(arrayList);
                    this.timer.run();
                    return;
                }
                this.timer.interrupt();
                this.timer = null;
                System.out.println("结束消费线程");
            });
            this.timer.start();
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        DelayList<Integer> delayList = new DelayList<>(10, 2000);
        for (int i = 0; i < 30000; i++) {
            Thread.sleep(1L);
            delayList.add(Integer.valueOf(i), list -> {
                System.out.println("批量执行" + DateUtil.format(new Date(), DateConstant.PATTERN_DATE_TIME) + " :  " + list.size());
            });
        }
    }
}