시간 초과 후 작업을 중단하는 ExecutorService
제한 시간이 제공 될 수 있는 ExecutorService 구현을 찾고 있습니다. ExecutorService에 제출 된 작업은 실행 시간 초과보다 오래 걸리면 중단됩니다. 그런 짐승을 구현하는 것은 그렇게 어려운 작업은 아니지만, 기존 구현에 대해 아는 사람이 있는지 궁금합니다.
다음은 아래 논의 중 일부를 기반으로 내가 생각 해낸 것입니다. 다른하실 말씀 있나요?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
}
}
}
이를 위해 ScheduledExecutorService 를 사용할 수 있습니다 . 먼저 한 번만 제출하여 즉시 시작하고 생성 된 미래를 유지합니다. 그 후에 일정 기간 후에 보유 된 미래를 취소하는 새 작업을 제출할 수 있습니다.
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
final Future handler = executor.submit(new Callable(){ ... });
executor.schedule(new Runnable(){
public void run(){
handler.cancel();
}
}, 10000, TimeUnit.MILLISECONDS);
그러면 10 초 동안 처리기 (중단 될 주요 기능)가 실행 된 다음 해당 특정 작업이 취소 (즉, 인터럽트)됩니다.
불행히도 솔루션에 결함이 있습니다. 에는 일종의 버그가 있으며이 질문ScheduledThreadPoolExecutor
에서도보고되었습니다 . 제출 된 작업을 취소해도 작업과 관련된 메모리 리소스가 완전히 해제되지는 않습니다. 자원은 작업이 만료 될 때만 해제됩니다.
따라서 TimeoutThreadPoolExecutor
상당히 긴 만료 시간 (일반적인 사용)으로를 만들고 작업을 충분히 빠르게 제출하면 작업이 실제로 성공적으로 완료 되었더라도 결국 메모리가 채워집니다.
다음 (매우 조잡한) 테스트 프로그램에서 문제를 확인할 수 있습니다.
public static void main(String[] args) throws InterruptedException {
ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
//ExecutorService service = Executors.newFixedThreadPool(1);
try {
final AtomicInteger counter = new AtomicInteger();
for (long i = 0; i < 10000000; i++) {
service.submit(new Runnable() {
@Override
public void run() {
counter.incrementAndGet();
}
});
if (i % 10000 == 0) {
System.out.println(i + "/" + counter.get());
while (i > counter.get()) {
Thread.sleep(10);
}
}
}
} finally {
service.shutdown();
}
}
프로그램은 생성 된 Runnable
s가 완료 될 때까지 기다리지 만 사용 가능한 메모리를 모두 소모합니다 .
나는 이것에 대해 잠시 동안했지만 불행히도 좋은 해결책을 찾지 못했습니다.
편집 :이 문제가 JDK 버그 6602600 으로보고되었으며 최근에 수정 된 것으로 나타 났습니다 .
FutureTask에서 작업을 래핑하고 FutureTask에 대한 시간 제한을 지정할 수 있습니다. 이 질문에 대한 내 대답의 예를보십시오.
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html에ExecutorService.shutDownNow()
설명 된 방법을 사용하는 것은 어떻습니까? 가장 간단한 해결책 인 것 같습니다.
문제는 JDK 버그 6602600 (2010-05-22에서 해결됨)이 아니라 원 안에 sleep (10)을 잘못 호출 한 것 같습니다. 참고로, 메인 스레드는 외부 원의 모든 분기에서 SLEEP (0)을 호출하여 자신의 작업을 실현하기 위해 다른 스레드에 직접 CHANCE를 제공해야합니다. Thread.sleep (0) 대신 Thread.yield ()를 사용하는 것이 더 좋습니다.
이전 문제 코드의 결과 수정 된 부분은 다음과 같습니다.
.......................
........................
Thread.yield();
if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}
//
// while (i > counter.get()) {
// Thread.sleep(10);
// }
최대 150,000,000 개의 테스트 된 원의 외부 카운터 양에서 올바르게 작동합니다.
설문 조사에 많은 시간을 보낸 후
마지막 으로이 문제를 해결하기 위해 invokeAll
방법을 사용 ExecutorService
합니다.
그러면 작업이 실행되는 동안 작업이 엄격하게 중단됩니다.
여기에 예가 있습니다
ExecutorService executorService = Executors.newCachedThreadPool();
try {
List<Callable<Object>> callables = new ArrayList<>();
// Add your long time task (callable)
callables.add(new VaryLongTimeTask());
// Assign tasks for specific execution timeout (e.g. 2 sec)
List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
for (Future<Object> future : futures) {
// Getting result
}
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
프로 당신은 또한 제출할 수있다 ListenableFuture
동일에 ExecutorService
.
코드의 첫 번째 줄을 약간 변경하십시오.
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListeningExecutorService
is the Listening feature of ExecutorService
at google guava project (com.google.guava) )
Using John W answer I created an implementation that correctly begin the timeout when the task starts its execution. I even write a unit test for it :)
However, it does not suit my needs since some IO operations do not interrupt when Future.cancel()
is called (ie when Thread.interrupted()
is called). Some examples of IO operation that may not be interrupted when Thread.interrupted()
is called are Socket.connect
and Socket.read
(and I suspect most of IO operation implemented in java.io
). All IO operations in java.nio
should be interruptible when Thread.interrupted()
is called. For example, that is the case for SocketChannel.open
and SocketChannel.read
.
Anyway if anyone is interested, I created a gist for a thread pool executor that allows tasks to timeout (if they are using interruptible operations...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf
What about this alternative idea :
- two have two executors :
- one for :
- submitting the task, without caring about the timeout of the task
- adding the Future resulted and the time when it should end to an internal structure
- one for executing an internal job which is checking the internal structure if some tasks are timeout and if they have to be cancelled.
- one for :
Small sample is here :
public class AlternativeExecutorService
{
private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;
public AlternativeExecutorService()
{
scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}
public void pushTask(OwnTask task)
{
ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable
futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}
public void shutdownInternalScheduledExecutor()
{
scheduledFuture.cancel(true);
scheduledExecutor.shutdownNow();
}
long getCurrentMillisecondsTime()
{
return Calendar.getInstance().get(Calendar.MILLISECOND);
}
class ListenableFutureTask
{
private final ListenableFuture<Void> future;
private final OwnTask task;
private final long milliSecEndTime;
private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
{
this.future = future;
this.task = task;
this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
}
ListenableFuture<Void> getFuture()
{
return future;
}
OwnTask getTask()
{
return task;
}
long getMilliSecEndTime()
{
return milliSecEndTime;
}
}
class TimeoutManagerJob implements Runnable
{
CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
{
return futureQueue;
}
@Override
public void run()
{
long currentMileSecValue = getCurrentMillisecondsTime();
for (ListenableFutureTask futureTask : futureQueue)
{
consumeFuture(futureTask, currentMileSecValue);
}
}
private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
{
ListenableFuture<Void> future = futureTask.getFuture();
boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
if (isTimeout)
{
if (!future.isDone())
{
future.cancel(true);
}
futureQueue.remove(futureTask);
}
}
}
class OwnTask implements Callable<Void>
{
private long timeoutDuration;
private TimeUnit timeUnit;
OwnTask(long timeoutDuration, TimeUnit timeUnit)
{
this.timeoutDuration = timeoutDuration;
this.timeUnit = timeUnit;
}
@Override
public Void call() throws Exception
{
// do logic
return null;
}
public long getTimeoutDuration()
{
return timeoutDuration;
}
public TimeUnit getTimeUnit()
{
return timeUnit;
}
}
}
참고URL : https://stackoverflow.com/questions/2758612/executorservice-that-interrupts-tasks-after-a-timeout
'program tip' 카테고리의 다른 글
Eclipse 프로젝트에서 일부 폴더를 제외하려면 어떻게해야합니까? (0) | 2020.09.18 |
---|---|
벡터 변환 (0) | 2020.09.18 |
Java-인터페이스 구현에서 메소드 이름 충돌 (0) | 2020.09.17 |
id_rsa.pub와 id_dsa.pub의 차이점은 무엇입니까? (0) | 2020.09.17 |
CSS 수직 정렬 플로팅 div (0) | 2020.09.17 |