From: markspace on 27 Nov 2009 11:52 Hole wrote: > Now, I'd like the SchedulerTask to finish its job and return > (executing the overriden done() method inherited by SwingWorker class) > only when all N SingleTasks have completed their work. Currently, I > have that SchedulerTask finishes its work after launched the N > SingleTask(s) without waiting for their completetion. 1. You should wait. Use a countdown latch. <http://java.sun.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html> > > I've tried to insert a while(finishedProcesses<N) do nothing cycle but > the GUI had dramatical issues in responsiveness and performances. 2. I think you're doing your while loop in the "done" method. Don't do that. Wait in doInBackground. > > Have you ever dealt with this kind of stuff in Swing? I'll try to cook up an example for you later, but those two points should get you pointed in the right direction. I didn't read the code you posted carefully, but after a glance a couple of things jumped out at me. > class SchedulerTask extends SwingWorker<Integer, Void> implements > ActionListener { "implements ActionListener" here strikes me as wrong. You want your action listener and your worker to be two seperate objects. SwingWorkers can't be reused, so there's no point to keeping one around. > class BatchTask extends SwingWorker<List<SampledValue>, Void> { You only need one SwingWorker to synchronize. These secondary tasks would be better as Runnables and handed off to an Executor. Final thoughts: Java Concurrency in Practice is an excellent book, you really need it for this sort of work. <http://www.javaconcurrencyinpractice.com/>
From: Douwe on 27 Nov 2009 12:02 Where should I start :/. First of all the field SchedulerTask.activeProcesses is accessed from different Threads so you should make it volatile. The next piece of code is causing your processor (or at least 1 core of it) to go to 100 % while (activeProcesses>0) { //do nothing...waiting... } you could insert a wait in there like final Object lock = new Object(); // you can move this more to the top of the class as a class field. while (activeProcesses>0) { synchronized(lock) { try { lock.wait(100l); } catch(ignore) {} } // puts the current thread into the wait state for 100 ms (you could also do Thread.sleep instead) but I preffer the Object.wait() method. } Next is the main loop while (lastIndex<items.size()) { while(lastIndex<items.size() && activeProcesses<MAX_ACTIVE_PROCESSES) { currentProcesses.add(this.items.get(lastIndex)); lastIndex++; activeProcesses++; } for (BatchExtractionItem item: currentProcesses) { if (!item.getStatus().equals (BatchExtractionItem.COMPLETED) && !item.getStatus().equals (BatchExtractionItem.PROCESSING)) { item.setStatus(BatchExtractionItem.PROCESSING); BatchTask task = new BatchTask(item); task.execute(); } } } This part is split in two subparts: the first part adds items to the currentProcesses as long as the activeProcesses is under a certain amount. The second part then runs through the list of currentProcesses and starts BatchTasks for items that have not started yet. Now imagine what happens when activeProcesses is exactly MAX_ACTIVE_PROCESSES then the first part will not enter the loop and the second part will enter the loop but see that all processes are running and thus does nothing more then checks. These two parts than get repeated over and over and over until one of the BatchTasks finishes. This loop will therefor also cause the CPU (or 1 core) to got to 100%. Try implementing something like the following piece of code private final Object lock = new Object(); private volatile int countCompleted; pricate volatile int activeProcesses; public void processCompleted(BatchExtractionItem completedItem) { //System.out.println("Completed "+completedItem); synchronized(lock) { activeProcesses--; countCompleted++; lock.notifyAll(); // wake up the main thread } } enum State { WAITING, RUN_NEXT }; protected Integer doInBackground() throws Exception { List<BatchExtractionItem> queuedItems = new ArrayList<BatchExtractionItem>(); queuedItems.addAll(items); activeProcesses = 0; countCompleted = 0; boolean keepRunning = true; BatchExtractionItem itemToRun = null; while (keep_running) { switch(state) { case WAITING : { synchronized(lock) { if (activeProcesses<MAX_ACTIVE_PROCESSES) { if (queuedItems.isEmpty()) { if (activeProcesses==0) { keep_running = false; break; } } else { state = FIND_TO_RUN; break; } } try { lock.wait(20000l); // wait for 20 seconds max (or a notify from processCompleted) and then check again } catch(Exception ignore) {} } } break; case RUN_NEXT : { BatchExtractionItem item = queuedItems.removeLast(queuedItems); if (!item.getStatus().equals(BatchExtractionItem.COMPLETED) && ! item.getStatus().equals(BatchExtractionItem.PROCESSING)) { item.setStatus(BatchExtractionItem.PROCESSING); BatchTask task = new BatchTask(item); task.execute(); activeProcesses++; } else { // spitt out a warning System.err.println("warn: next item was already processing or has completed"); // countCompleted++; // add this if it should be counted as a completed task } state = WAITING; } break; } } // no further checking needed ... all items have finised processing return 0; } @Override public void actionPerformed(ActionEvent e) { //issued by Timer event int progress = countCompleted/items.size(); setProgress(progress); form.getProgressBar().setValue(progress); } Please note that I didn't test the code and I have edited the code without an Java compatible IDE so you might need some small adjustments. Regards, Douwe Vos
From: markspace on 27 Nov 2009 13:44 Here's what I came up with: package swingthreads; import java.awt.BorderLayout; import java.awt.FlowLayout; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.SwingUtilities; import javax.swing.SwingWorker; public class Main { public static void main( String[] args ) { SwingUtilities.invokeLater( new Runnable() { public void run() { createGui(); } } ); } private static void createGui() { final ExampleFrame frame = new ExampleFrame(); final Executor exe = Executors.newFixedThreadPool( 5 ); ActionListener batchKicker = new ActionListener() { public void actionPerformed( ActionEvent e ) { ExampleWorker worker = new ExampleWorker( exe, 5, frame. getOutput() ); worker.execute(); } }; frame.addButtonListener( batchKicker ); frame.setVisible( true ); } } class ExampleFrame extends JFrame { JTextArea output; JButton button; public ExampleFrame() { super( "Example Batch Threads" ); output = new JTextArea(); JScrollPane sp = new JScrollPane( output ); add( sp ); button = new JButton( "Start" ); JPanel panel = new JPanel( new FlowLayout( FlowLayout.CENTER ) ); panel.add( button ); add( panel, BorderLayout.SOUTH ); setLocationRelativeTo( null ); setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE ); setSize( 300, 300 ); } public JTextArea getOutput() { return output; } public void addButtonListener( ActionListener a ) { button.addActionListener( a ); } } class ExampleWorker extends SwingWorker<Void, Void> { Executor exe; JTextArea output; int tasks; public ExampleWorker( Executor exe, int tasks, JTextArea output ) { this.exe = exe; this.tasks = tasks; this.output = output; } @Override protected Void doInBackground() throws Exception { CountDownLatch latch = new CountDownLatch( tasks ); for( int i = 0; i < tasks; i++ ) { ExampleBatchTask batch = new ExampleBatchTask( output, latch ); exe.execute( batch ); } latch.await(); return null; } @Override public void done() { output.append( "All done!\n" ); } } class ExampleBatchTask implements Runnable { JTextArea output; CountDownLatch latch; public ExampleBatchTask( JTextArea output, CountDownLatch latch ) { this.output = output; this.latch = latch; } public void run() { int interval = (int) (Math.random() * 4 + 1); // append() is thread safe! output.append( "Started: " + interval + "\n" ); for( int i = 0; i < 5; i++ ) { try { Thread.sleep( interval * 1000 ); } catch( InterruptedException ex ) { break; // exit immediately if interrupted } output.append( "running " + interval + "\n" ); } latch.countDown(); } }
From: Lew on 27 Nov 2009 13:48 Hole wrote: > I have a method (issued by an actionPerformed) that instantiate a > SchedulerTask object (a SwingWorker object) and call execute() method > on it. > This execute() method instantiate N SingleTasks and call the execute() > method on them. > By 'SIngleTask' I assume you mean 'BatchTask'. It struck me that you're invoking a 'SwingWorker' from the 'doinBackground()' method of a 'SwingWorker'. Don't do that. Just spawn a thread - you're already off the EventDispatchThread (EDT), so the purpose of 'SwingWorker' is already achieved. Your 'doInBackground()' method can do a 'Thread#join()' on each sub- task after having spawned all of them. That way it's guaranteed that all sub-tasks have completed before the parent ('SwingWorker') task returns. -- Lew
From: Lew on 27 Nov 2009 14:01
Douwe wrote: > Where should I start :/. First of all the field > Where should I start? The example you provided is rife with unsynchronized access to shared data. > ... Try implementing > something like the following piece of code > > private final Object lock = new Object(); > private volatile int countCompleted; > pricate volatile int activeProcesses; > You're controlling these variables with different monitors - sometimes 'lock', sometimes the internal one due to being 'volatile'. I'm not convinced that the relationship between these variables is reliably readable. > public void processCompleted(BatchExtractionItem completedItem) { > //System.out.println("Completed "+completedItem); > > synchronized(lock) { > activeProcesses--; > countCompleted++; > lock.notifyAll(); // wake up the main thread > } > > } > > enum State { WAITING, RUN_NEXT }; > > protected Integer doInBackground() throws Exception { > > List<BatchExtractionItem> queuedItems = new > ArrayList<BatchExtractionItem>(); > queuedItems.addAll(items); > activeProcesses = 0; > countCompleted = 0; > boolean keepRunning = true; > BatchExtractionItem itemToRun = null; > > while (keep_running) { > switch(state) { The read of 'state' is not synchronized with the write. > case WAITING : { > synchronized(lock) { > if (activeProcesses<MAX_ACTIVE_PROCESSES) { > if (queuedItems.isEmpty()) { > if (activeProcesses==0) { > keep_running = false; > break; > } > } else { > state = FIND_TO_RUN; > break; > } > } > try { > lock.wait(20000l); // wait for 20 seconds max > (or a notify from processCompleted) and then check again > } catch(Exception ignore) {} > } > } break; > > case RUN_NEXT : { > BatchExtractionItem item = queuedItems.removeLast(queuedItems); You don't synchronize the change to 'queuedItems'. > if (!item..getStatus().equals(BatchExtractionItem.COMPLETED) && ! or the 'getStatus()' read. > item.getStatus().equals(BatchExtractionItem.PROCESSING)) { > item.setStatus(BatchExtractionItem.PROCESSING); > BatchTask task = new BatchTask(item); > task.execute(); > activeProcesses++; > } else { > // spitt out a warning > System.err.println("warn: next item was already processing or has > completed"); > // countCompleted++; // add this if it should be counted as a > completed task > } > state = WAITING; > } break; > } > } > // no further checking needed ... all items have finised processing > return 0; > > } > > @Override > public void actionPerformed(ActionEvent e) { > //issued by Timer event > int progress = countCompleted/items.size(); > setProgress(progress); > form.getProgressBar().setValue(progress); > > } > I'm having difficulty reasoning about the synchronization the way you've written all this. I suspect there are subtle threading bugs there. -- Lew |