program tip

시간 초과 후 작업을 중단하는 ExecutorService

radiobox 2020. 9. 17. 07:30
반응형

시간 초과 후 작업을 중단하는 ExecutorService


제한 시간이 제공 될 수 있는 ExecutorService 구현을 찾고 있습니다. ExecutorService에 제출 된 작업은 실행 시간 초과보다 오래 걸리면 중단됩니다. 그런 짐승을 구현하는 것은 그렇게 어려운 작업은 아니지만, 기존 구현에 대해 아는 사람이 있는지 궁금합니다.

다음은 아래 논의 중 일부를 기반으로 내가 생각 해낸 것입니다. 다른하실 말씀 있나요?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}

이를 위해 ScheduledExecutorService사용할 수 있습니다 . 먼저 한 번만 제출하여 즉시 시작하고 생성 된 미래를 유지합니다. 그 후에 일정 기간 후에 보유 된 미래를 취소하는 새 작업을 제출할 수 있습니다.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

그러면 10 초 동안 처리기 (중단 될 주요 기능)가 실행 된 다음 해당 특정 작업이 취소 (즉, 인터럽트)됩니다.


불행히도 솔루션에 결함이 있습니다. 에는 일종의 버그가 있으며이 질문ScheduledThreadPoolExecutor 에서도보고되었습니다 . 제출 된 작업을 취소해도 작업과 관련된 메모리 리소스가 완전히 해제되지는 않습니다. 자원은 작업이 만료 될 때만 해제됩니다.

따라서 TimeoutThreadPoolExecutor상당히 긴 만료 시간 (일반적인 사용)으로를 만들고 작업을 충분히 빠르게 제출하면 작업이 실제로 성공적으로 완료 되었더라도 결국 메모리가 채워집니다.

다음 (매우 조잡한) 테스트 프로그램에서 문제를 확인할 수 있습니다.

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

프로그램은 생성 된 Runnables가 완료 될 때까지 기다리지 만 사용 가능한 메모리를 모두 소모합니다 .

나는 이것에 대해 잠시 동안했지만 불행히도 좋은 해결책을 찾지 못했습니다.

편집 :이 문제가 JDK 버그 6602600 으로보고되었으며 최근에 수정 된 것으로 나타 났습니다 .


FutureTask에서 작업을 래핑하고 FutureTask에 대한 시간 제한을 지정할 수 있습니다. 이 질문에 대한 내 대답의 예를보십시오.

자바 네이티브 프로세스 시간 초과


http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html에ExecutorService.shutDownNow() 설명 된 방법을 사용하는 것은 어떻습니까? 가장 간단한 해결책 인 것 같습니다.


문제는 JDK 버그 6602600 (2010-05-22에서 해결됨)이 아니라 원 안에 sleep (10)을 잘못 호출 한 것 같습니다. 참고로, 메인 스레드는 외부 원의 모든 분기에서 SLEEP (0)을 호출하여 자신의 작업을 실현하기 위해 다른 스레드에 직접 CHANCE를 제공해야합니다. Thread.sleep (0) 대신 Thread.yield ()를 사용하는 것이 더 좋습니다.

이전 문제 코드의 결과 수정 된 부분은 다음과 같습니다.

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

최대 150,000,000 개의 테스트 된 원의 외부 카운터 양에서 올바르게 작동합니다.


설문 조사에 많은 시간을 보낸 후
마지막 으로이 문제를 해결하기 위해 invokeAll방법을 사용 ExecutorService합니다.
그러면 작업이 실행되는 동안 작업이 엄격하게 중단됩니다.
여기에 예가 있습니다

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

프로 당신은 또한 제출할 수있다 ListenableFuture동일에 ExecutorService.
코드의 첫 번째 줄을 약간 변경하십시오.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorService is the Listening feature of ExecutorService at google guava project (com.google.guava) )


Using John W answer I created an implementation that correctly begin the timeout when the task starts its execution. I even write a unit test for it :)

However, it does not suit my needs since some IO operations do not interrupt when Future.cancel() is called (ie when Thread.interrupted() is called). Some examples of IO operation that may not be interrupted when Thread.interrupted() is called are Socket.connect and Socket.read (and I suspect most of IO operation implemented in java.io). All IO operations in java.nio should be interruptible when Thread.interrupted() is called. For example, that is the case for SocketChannel.open and SocketChannel.read.

Anyway if anyone is interested, I created a gist for a thread pool executor that allows tasks to timeout (if they are using interruptible operations...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf


What about this alternative idea :

  • two have two executors :
    • one for :
      • submitting the task, without caring about the timeout of the task
      • adding the Future resulted and the time when it should end to an internal structure
    • one for executing an internal job which is checking the internal structure if some tasks are timeout and if they have to be cancelled.

Small sample is here :

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}

참고URL : https://stackoverflow.com/questions/2758612/executorservice-that-interrupts-tasks-after-a-timeout

반응형