Executors: Wie synchron warten, bis alle Aufgaben abgeschlossen sind, wenn Tasks rekursiv erstellt werden?

Meine Frage ist hier stark verwandt. Wie ich dort gepostet habe, möchte ich, dass der Haupt-Thread wartet, bis die Arbeitswarteschlange leer ist und alle Aufgaben beendet sind. Das Problem in meiner Situation ist jedoch, dass jede Aufgabe rekursiv dazu führen kann, dass neue Aufgaben zur Verarbeitung übergeben werden. Dies macht es ein wenig schwierig, all die Aufgaben dieser Aufgaben zu sammeln.

Unsere aktuelle Lösung verwendet eine Busy-Wait-Schleife, um auf die Beendigung zu warten:

do { //Wait until we are done the processing try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } while (!executor.getQueue().isEmpty() || numTasks.longValue() > executor.getCompletedTaskCount()); 

numTasks ist ein Wert, der erhöht wird, wenn jede neue Aufgabe erstellt wird. Das funktioniert, aber ich denke, es ist nicht sehr nett wegen der geschäftigen warten. Ich habe mich gefragt, ob es eine gute Möglichkeit gibt, den Haupt-Thread synchron zu warten, bis er explizit aufgeweckt wird.

Vielen Dank für all Ihre Vorschläge!

Am Ende entschied ich mich für etwas, das meiner Meinung nach relativ einfach ist. Ich habe herausgefunden, dass CountDownLatch fast das ist, was ich brauche. Es blockiert, bis der Zähler 0 erreicht. Das einzige Problem ist, dass es nur abwärts zählen kann, nicht aufwärts, und daher nicht in der dynamischen Einstellung funktioniert, in der Aufgaben neue Aufgaben übertragen können. Ich habe daher eine neue class CountLatch , die zusätzliche functionalität bietet. (siehe unten) Diese class verwende ich dann wie folgt.

Der Hauptthread ruft latch.awaitZero() und blockiert bis zum Erreichen von 0.

Jeder Thread ruft vor dem Aufruf von executor.execute(..) latch.increment() .

Jede Aufgabe, kurz bevor sie abgeschlossen wird, ruft latch.decrement() .

Wenn die letzte Aufgabe beendet wird, erreicht der Zähler 0 und gibt somit den Haupt-Thread frei.

Weitere Anregungen und Feedback sind herzlich willkommen!

 public class CountLatch { @SuppressWarnings("serial") private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected int acquireNonBlocking(int acquires) { // increment count for (;;) { int c = getState(); int nextc = c + 1; if (compareAndSetState(c, nextc)) return 1; } } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountLatch(int count) { this.sync = new Sync(count); } public void awaitZero() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void increment() { sync.acquireNonBlocking(1); } public void decrement() { sync.releaseShared(1); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } } 

Beachten Sie, dass die Aufrufe von increment() / decrement() in eine angepasste Executor Unterklasse eingekapselt werden können, wie es beispielsweise von Sami Korhonen vorgeschlagen wurde, oder mit beforeExecute und afterExecute wie von impl vorgeschlagen wurde. Siehe hier:

 public class CountingThreadPoolExecutor extends ThreadPoolExecutor { protected final CountLatch numRunningTasks = new CountLatch(0); public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public void execute(Runnable command) { numRunningTasks.increment(); super.execute(command); } @Override protected void afterExecute(Runnable r, Throwable t) { numRunningTasks.decrement(); super.afterExecute(r, t); } /** * Awaits the completion of all spawned tasks. */ public void awaitCompletion() throws InterruptedException { numRunningTasks.awaitZero(); } /** * Awaits the completion of all spawned tasks. */ public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException { numRunningTasks.awaitZero(timeout, unit); } } 

Dieser war eigentlich recht interessantes Problem zu lösen. Ich muss warnen, dass ich den Code nicht vollständig getestet habe.

Die Idee ist, einfach die Ausführung der Aufgabe zu verfolgen:

  • Wenn die Aufgabe erfolgreich in die Warteschlange gestellt wurde, wird der Zähler um eins erhöht
  • Wenn die Aufgabe abgebrochen wird und nicht ausgeführt wurde, wird der Zähler um eins dekrementiert
  • Wurde die Aufgabe ausgeführt, wird der Zähler um eins dekrementiert

Wenn das Herunterfahren aufgerufen wird und ausstehende Aufgaben vorhanden sind, ruft das Delegat das Herunterfahren des tatsächlichen ExecutorService nicht auf. Dadurch können neue Aufgaben in die Warteschlange eingereiht werden, bis die Anzahl der ausstehenden Aufgaben null erreicht und das Herunterfahren am tatsächlichen ExecutorService aufgerufen wird.

 public class ResilientExecutorServiceDelegate implements ExecutorService { private final ExecutorService executorService; private final AtomicInteger pendingTasks; private final Lock readLock; private final Lock writeLock; private boolean isShutdown; public ResilientExecutorServiceDelegate(ExecutorService executorService) { ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.pendingTasks = new AtomicInteger(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); this.executorService = executorService; this.isShutdown = false; } private  T addTask(Callable task) { T result; boolean success = false; // Increment pending tasks counter incrementPendingTaskCount(); try { // Call service result = task.call(); success = true; } catch (RuntimeException exception) { throw exception; } catch (Exception exception) { throw new RejectedExecutionException(exception); } finally { if (!success) { // Decrement pending tasks counter decrementPendingTaskCount(); } } return result; } private void incrementPendingTaskCount() { pendingTasks.incrementAndGet(); } private void decrementPendingTaskCount() { readLock.lock(); if (pendingTasks.decrementAndGet() == 0 && isShutdown) { try { // Shutdown executorService.shutdown(); } catch (Throwable throwable) { } } readLock.unlock(); } @Override public void execute(final Runnable task) { // Add task addTask(new Callable() { @Override public Object call() { executorService.execute(new Runnable() { @Override public void run() { try { task.run(); } finally { decrementPendingTaskCount(); } } }); return null; } }); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { // Call service return executorService.awaitTermination(timeout, unit); } @Override public  List> invokeAll(Collection< ? extends Callable> tasks) throws InterruptedException { // It's ok to increment by just one incrementPendingTaskCount(); try { return executorService.invokeAll(tasks); } finally { decrementPendingTaskCount(); } } @Override public  List> invokeAll( Collection< ? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException { // It's ok to increment by just one incrementPendingTaskCount(); try { return executorService.invokeAll(tasks, timeout, unit); } finally { decrementPendingTaskCount(); } } @Override public  T invokeAny(Collection< ? extends Callable> tasks) throws InterruptedException, ExecutionException { // It's ok to increment by just one incrementPendingTaskCount(); try { return executorService.invokeAny(tasks); } finally { decrementPendingTaskCount(); } } @Override public  T invokeAny(Collection< ? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { incrementPendingTaskCount(); try { return executorService.invokeAny(tasks, timeout, unit); } finally { decrementPendingTaskCount(); } } @Override public boolean isShutdown() { return isShutdown; } @Override public boolean isTerminated() { return executorService.isTerminated(); } @Override public void shutdown() { // Lock write lock writeLock.lock(); // Set as shutdown isShutdown = true; try { if (pendingTasks.get() == 0) { // Real shutdown executorService.shutdown(); } } finally { // Unlock write lock writeLock.unlock(); } } @Override public List shutdownNow() { // Lock write lock writeLock.lock(); // Set as shutdown isShutdown = true; // Unlock write lock writeLock.unlock(); return executorService.shutdownNow(); } @Override public  Future submit(final Callable task) { // Create execution status final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus(); // Add task return addTask(new Callable>() { @Override public Future call() { return new FutureDelegate( executorService.submit(new Callable() { @Override public T call() throws Exception { try { // Mark as executed futureExecutionStatus.setExecuted(); // Run the actual task return task.call(); } finally { decrementPendingTaskCount(); } } }), futureExecutionStatus); } }); } @Override public Future< ?> submit(final Runnable task) { // Create execution status final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus(); // Add task return addTask(new Callable>() { @Override @SuppressWarnings("unchecked") public Future< ?> call() { return new FutureDelegate( (Future) executorService.submit(new Runnable() { @Override public void run() { try { // Mark as executed futureExecutionStatus.setExecuted(); // Run the actual task task.run(); } finally { decrementPendingTaskCount(); } } }), futureExecutionStatus); } }); } @Override public  Future submit(final Runnable task, final T result) { // Create execution status final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus(); // Add task return addTask(new Callable>() { @Override public Future call() { return new FutureDelegate(executorService.submit( new Runnable() { @Override public void run() { try { // Mark as executed futureExecutionStatus.setExecuted(); // Run the actual task task.run(); } finally { decrementPendingTaskCount(); } } }, result), futureExecutionStatus); } }); } private class FutureExecutionStatus { private volatile boolean executed; public FutureExecutionStatus() { executed = false; } public void setExecuted() { executed = true; } public boolean isExecuted() { return executed; } } private class FutureDelegate implements Future { private Future future; private FutureExecutionStatus executionStatus; public FutureDelegate(Future future, FutureExecutionStatus executionStatus) { this.future = future; this.executionStatus = executionStatus; } @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = future.cancel(mayInterruptIfRunning); if (cancelled) { // Lock read lock readLock.lock(); // If task was not executed if (!executionStatus.isExecuted()) { decrementPendingTaskCount(); } // Unlock read lock readLock.unlock(); } return cancelled; } @Override public T get() throws InterruptedException, ExecutionException { return future.get(); } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return future.get(timeout, unit); } @Override public boolean isCancelled() { return future.isCancelled(); } @Override public boolean isDone() { return future.isDone(); } } } 

Java 7 bietet einen Synchronizer, der für diesen Anwendungsfall namens Phaser geeignet ist. Es ist ein wiederverwendbarer Hybrid aus CountDownLatch und CyclicBarrier, der die Anzahl registrierter Parteien sowohl erhöhen als auch verringern kann (ähnlich einem inkrementierbaren CountDownLatch).

Das Grundmuster für die Verwendung des Phasenverstellers in diesem Szenario besteht darin , Tasks beim Erstellen mit dem Phasenversteller zu registrieren und bei Fertigstellung anzukommen . Wenn die Anzahl der angekommenen Parteien mit der Anzahl der registrierten übereinstimmt, “rückt” der Phasensteller zur nächsten Phase vor und benachrichtigt alle wartenden Threads des Fortschritts, wenn dieser stattfindet.

Hier ist ein Beispiel, das ich erstellt habe und auf den rekursiven Task-Abschluss wartet. Es findet naiv die ersten paar Zahlen der Fibonacci-Sequenz zu Demonstrationszwecken:

 import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicLong; /** * An example of using a Phaser to wait for the completion of recursive tasks. * @author Voxelot */ public class PhaserExample { /** Workstealing threadpool with reduced queue contention. */ private static ForkJoinPool executors; /** * @param args the command line arguments */ public static void main(String[] args) throws InterruptedException { executors = new ForkJoinPool(); List sequence = new ArrayList<>(); for (int i = 0; i < 20; i++) { sequence.add(fib(i)); } System.out.println(sequence); } /** * Computes the nth Fibonacci number in the Fibonacci sequence. * @param n The index of the Fibonacci number to compute * @return The computed Fibonacci number */ private static Long fib(int n) throws InterruptedException { AtomicLong result = new AtomicLong(); //Flexible sychronization barrier Phaser phaser = new Phaser(); //Base task Task initialTask = new Task(n, result, phaser); //Register fib(n) calling thread phaser.register(); //Submit base task executors.submit(initialTask); //Make the calling thread arrive at the synchronization //barrier and wait for all future tasks to arrive. phaser.arriveAndAwaitAdvance(); //Get the result of the parallel computation. return result.get(); } private static class Task implements Runnable { /** The Fibonacci sequence index of this task. */ private final int index; /** The shared result of the computation. */ private final AtomicLong result; /** The synchronizer. */ private final Phaser phaser; public Task(int n, AtomicLong result, Phaser phaser) { index = n; this.result = result; this.phaser = phaser; //Inform synchronizer of additional work to complete. phaser.register(); } @Override public void run() { if (index == 1) { result.incrementAndGet(); } else if (index > 1) { //recurrence relation: Fn = Fn-1 + Fn-2 Task task1 = new Task(index - 1, result, phaser); Task task2 = new Task(index - 2, result, phaser); executors.submit(task1); executors.submit(task2); } //Notify synchronizer of task completion. phaser.arrive(); } } } 

Warum benutzt du keinen Zähler? Beispielsweise:

 private AtomicInteger counter = new AtomicInteger(0); 

und erhöhen Sie den Zähler um eins, bevor Sie die Aufgabe an die Warteschlange übergeben:

 counter.incrementAndGet(); 

und dekrementiere es am Ende der Aufgabe um eins:

 counter.decrementAndGet(); 

und der Check wäre so etwas wie:

 // ... while (counter.get() > 0); 

Java 7 bietet Unterstützung für rekursive Aufgaben über den ForkJoinPool- Executor. Es ist recht einfach zu benutzen und skaliert ziemlich gut, solange die Aufgaben selbst nicht zu trivial sind. Im Wesentlichen bietet es eine kontrollierte Schnittstelle, die es Aufgaben ermöglicht, auf den Abschluss von Teilaufgaben zu warten, ohne den zugrunde liegenden Thread auf unbestimmte Zeit zu blockieren.

Eine der vorgeschlagenen Optionen in den Antworten, auf die Sie verweisen, ist die Verwendung eines CompletionService

Sie können das in Ihrem Hauptthread wartende Warten durch Folgendes ersetzen:

 while (true) { Future< ?> f = completionService.take(); //blocks until task completes if (executor.getQueue().isEmpty() && numTasks.longValue() == executor.getCompletedTaskCount()) break; } 

Beachten Sie, dass getCompletedTaskCount nur eine ungefähre Zahl zurückgibt, so dass Sie möglicherweise eine bessere getCompletedTaskCount finden müssen.

Wenn Sie die Anzahl der zu wartenden Threads kennen und eine Zeile Code einfügen können, um die Anzahl für jeden Thread mithilfe von CountDownLatch zu erhöhen ( http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/) CountDownLatch.html ) Es kann Ihr Problem lösen

Da die letzte Aufgabe nicht weiß, dass es die letzte ist, glaube ich nicht, dass es möglich ist, dass diese Arbeit zu 100% korrekt funktioniert, ohne dass sie aufgezeichnet wird, wenn Aufgaben gestartet werden und wenn sie abgeschlossen sind.

Wenn mir der richtige Speicher zur Verfügung steht, gibt die Methode getQueue() eine Warteschlange zurück, die nur Aufgaben enthält, die noch auf ihre Ausführung warten, und keine, die gerade ausgeführt werden. Darüber hinaus ist getCompletedTaskCount() ungefähre getCompletedTaskCount() .

Die Lösung, über die ich nachdenke, sieht so aus, mit einem atomaren Zähler wie in Eng.Fouads Antwort und einer Bedingung, um dem Haupt-Thread zu signalisieren, dass er aufwachen soll (verzeihe die Abkürzungen der Einfachheit halber):

 public class MyThreadPoolExecutorState { public final Lock lock = new ReentrantLock(); public final Condition workDone = lock.newCondition(); public boolean workIsDone = false; } public class MyThreadPoolExecutor extends ThreadPoolExecutor { private final MyThreadPoolExecutorState state; private final AtomicInteger counter = new AtomicInteger(0); public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) { super(...); this.state = state; } protected void beforeExecute(Thread t, Runnable r) { this.counter.incrementAndGet(); } protected void afterExecute(Runnable r, Throwable t) { if(this.counter.decrementAndGet() == 0) { this.state.lock.lock(); try { this.state.workIsDone = true; this.state.workDone.signal(); } finally { this.state.lock.unlock(); } } } } public class MyApp { public static void main(...) { MyThreadPoolExecutorState state = new MyThreadPoolExecutorState(); MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...); // Fire ze missiles! executor.submit(...); state.lock.lock(); try { while(state.workIsDone == false) { state.workDone.await(); } } finally { state.lock.unlock(); } } } 

Es könnte ein wenig eleganter sein (vielleicht nur ein getState() in Ihrem Thread-Pool-Executor oder etwas?), Aber ich denke, es sollte den Job erledigt bekommen. Es ist auch nicht getestet, also implementieren Sie auf eigene Gefahr …

Es ist erwähnenswert, dass diese Lösung definitiv fehlschlägt, wenn keine Aufgaben ausgeführt werden müssen – sie werden das Signal auf unbestimmte Zeit erwarten. Also starte den Executor nicht einmal, wenn du keine Aufgaben ausführen musst.


Edit: Beim zweiten Gedanken sollte das Inkrementieren des Atomzählers bei der Übergabe erfolgen, nicht unmittelbar vor der Ausführung der Aufgabe (weil die Warteschlangenbildung dazu führen könnte, dass der Zähler vorzeitig auf 0 fällt). Es ist wahrscheinlich sinnvoll, stattdessen die Methoden submit(...) zu überschreiben und möglicherweise auch remove(...) und shutdown() remove(...) (wenn Sie sie verwenden). Die allgemeine Idee bleibt jedoch dieselbe. (Aber je mehr ich darüber nachdenke, desto weniger schön ist es.)

Ich würde auch die Interna der class überprüfen, um zu sehen, ob Sie irgendwelche Kenntnisse daraus erhalten können: http://hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/ classn / Java / util / gleichzeitig / ThreadPoolExecutor.java . Die Methode tryTerminate() sieht interessant aus.

Sie könnten einen Atomzähler verwenden, um das Senden zu zählen (wie gesagt wurde, bevor Sie tatsächlich senden). Kombinieren Sie dies mit einem Semaphor und geben Sie es im afterExecute Hook frei, den ein ThreadPoolExecutor bereitstellt. Statt busy-waiting, rufen Sie semaphore.acquire( counter.get()) nachdem die erste Runde von Jobs übergeben wurde. Aber die Anzahl der Akquisitionen wird beim Aufruf von acquire zu klein sein, da der Zähler später steigen kann. Sie müssten die Acquire-Aufrufe mit dem Anstieg seit dem letzten Aufruf als Argument wiederholen, bis der Zähler nicht mehr ansteigt.