From: markspace on
Hole wrote:

> Now, I'd like the SchedulerTask to finish its job and return
> (executing the overriden done() method inherited by SwingWorker class)
> only when all N SingleTasks have completed their work. Currently, I
> have that SchedulerTask finishes its work after launched the N
> SingleTask(s) without waiting for their completetion.


1. You should wait. Use a countdown latch.

<http://java.sun.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html>


>
> I've tried to insert a while(finishedProcesses<N) do nothing cycle but
> the GUI had dramatical issues in responsiveness and performances.


2. I think you're doing your while loop in the "done" method. Don't do
that. Wait in doInBackground.


>
> Have you ever dealt with this kind of stuff in Swing?


I'll try to cook up an example for you later, but those two points
should get you pointed in the right direction. I didn't read the code
you posted carefully, but after a glance a couple of things jumped out
at me.


> class SchedulerTask extends SwingWorker<Integer, Void> implements
> ActionListener {


"implements ActionListener" here strikes me as wrong. You want your
action listener and your worker to be two seperate objects.
SwingWorkers can't be reused, so there's no point to keeping one around.


> class BatchTask extends SwingWorker<List<SampledValue>, Void> {


You only need one SwingWorker to synchronize. These secondary tasks
would be better as Runnables and handed off to an Executor.



Final thoughts: Java Concurrency in Practice is an excellent book, you
really need it for this sort of work.

<http://www.javaconcurrencyinpractice.com/>
From: Douwe on
Where should I start :/. First of all the field
SchedulerTask.activeProcesses is accessed from different Threads so
you should make it volatile. The next piece of code is causing your
processor (or at least 1 core of it) to go to 100 %

while (activeProcesses>0) {
//do nothing...waiting...
}

you could insert a wait in there like

final Object lock = new Object(); // you can move this more to the
top of the class as a class field.
while (activeProcesses>0) {
synchronized(lock) { try { lock.wait(100l); } catch(ignore)
{} } // puts the current thread into the wait state for 100 ms (you
could also do Thread.sleep instead) but I preffer the Object.wait()
method.
}


Next is the main loop

while (lastIndex<items.size()) {
while(lastIndex<items.size() &&
activeProcesses<MAX_ACTIVE_PROCESSES) {
currentProcesses.add(this.items.get(lastIndex));
lastIndex++;
activeProcesses++;
}

for (BatchExtractionItem item: currentProcesses) {
if (!item.getStatus().equals
(BatchExtractionItem.COMPLETED) && !item.getStatus().equals
(BatchExtractionItem.PROCESSING)) {
item.setStatus(BatchExtractionItem.PROCESSING);
BatchTask task = new BatchTask(item);
task.execute();
}
}
}


This part is split in two subparts: the first part adds items to the
currentProcesses as long as the activeProcesses is under a certain
amount. The second part then runs through the list of currentProcesses
and starts BatchTasks for items that have not started yet. Now imagine
what happens when activeProcesses is exactly MAX_ACTIVE_PROCESSES then
the first part will not enter the loop and the second part will enter
the loop but see that all processes are running and thus does nothing
more then checks. These two parts than get repeated over and over and
over until one of the BatchTasks finishes. This loop will therefor
also cause the CPU (or 1 core) to got to 100%. Try implementing
something like the following piece of code



private final Object lock = new Object();
private volatile int countCompleted;
pricate volatile int activeProcesses;

public void processCompleted(BatchExtractionItem completedItem) {
//System.out.println("Completed "+completedItem);

synchronized(lock) {
activeProcesses--;
countCompleted++;
lock.notifyAll(); // wake up the main thread
}
}


enum State { WAITING, RUN_NEXT };


protected Integer doInBackground() throws Exception {

List<BatchExtractionItem> queuedItems = new
ArrayList<BatchExtractionItem>();
queuedItems.addAll(items);
activeProcesses = 0;
countCompleted = 0;
boolean keepRunning = true;
BatchExtractionItem itemToRun = null;

while (keep_running) {
switch(state) {
case WAITING : {
synchronized(lock) {
if (activeProcesses<MAX_ACTIVE_PROCESSES) {
if (queuedItems.isEmpty()) {
if (activeProcesses==0) {
keep_running = false;
break;
}
} else {
state = FIND_TO_RUN;
break;
}
}
try {
lock.wait(20000l); // wait for 20 seconds max
(or a notify from processCompleted) and then check again
} catch(Exception ignore) {}
}
} break;

case RUN_NEXT : {
BatchExtractionItem item = queuedItems.removeLast(queuedItems);
if (!item.getStatus().equals(BatchExtractionItem.COMPLETED) && !
item.getStatus().equals(BatchExtractionItem.PROCESSING)) {
item.setStatus(BatchExtractionItem.PROCESSING);
BatchTask task = new BatchTask(item);
task.execute();
activeProcesses++;
} else {
// spitt out a warning
System.err.println("warn: next item was already processing or has
completed");
// countCompleted++; // add this if it should be counted as a
completed task
}
state = WAITING;
} break;
}
}
// no further checking needed ... all items have finised processing
return 0;
}



@Override
public void actionPerformed(ActionEvent e) {
//issued by Timer event
int progress = countCompleted/items.size();
setProgress(progress);
form.getProgressBar().setValue(progress);
}



Please note that I didn't test the code and I have edited the code
without an Java compatible IDE so you might need some small
adjustments.


Regards,
Douwe Vos
From: markspace on
Here's what I came up with:

package swingthreads;

import java.awt.BorderLayout;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;

public class Main
{

public static void main( String[] args )
{
SwingUtilities.invokeLater( new Runnable()
{
public void run()
{
createGui();
}
} );
}

private static void createGui()
{
final ExampleFrame frame = new ExampleFrame();
final Executor exe = Executors.newFixedThreadPool( 5 );

ActionListener batchKicker = new ActionListener()
{

public void actionPerformed( ActionEvent e )
{
ExampleWorker worker = new ExampleWorker( exe, 5, frame.
getOutput() );
worker.execute();
}
};

frame.addButtonListener( batchKicker );
frame.setVisible( true );
}
}

class ExampleFrame
extends JFrame
{

JTextArea output;
JButton button;

public ExampleFrame()
{
super( "Example Batch Threads" );
output = new JTextArea();
JScrollPane sp = new JScrollPane( output );
add( sp );
button = new JButton( "Start" );
JPanel panel = new JPanel( new FlowLayout( FlowLayout.CENTER ) );
panel.add( button );
add( panel, BorderLayout.SOUTH );
setLocationRelativeTo( null );
setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );
setSize( 300, 300 );
}

public JTextArea getOutput()
{
return output;
}

public void addButtonListener( ActionListener a )
{
button.addActionListener( a );
}
}

class ExampleWorker
extends SwingWorker<Void, Void>
{

Executor exe;
JTextArea output;
int tasks;

public ExampleWorker( Executor exe,
int tasks, JTextArea output )
{
this.exe = exe;
this.tasks = tasks;
this.output = output;
}

@Override
protected Void doInBackground()
throws Exception
{
CountDownLatch latch = new CountDownLatch( tasks );
for( int i = 0; i < tasks; i++ ) {
ExampleBatchTask batch =
new ExampleBatchTask( output, latch );
exe.execute( batch );
}
latch.await();
return null;
}

@Override
public void done()
{
output.append( "All done!\n" );
}
}

class ExampleBatchTask
implements Runnable
{

JTextArea output;
CountDownLatch latch;

public ExampleBatchTask( JTextArea output, CountDownLatch latch )
{
this.output = output;
this.latch = latch;
}

public void run()
{
int interval = (int) (Math.random() * 4 + 1);
// append() is thread safe!
output.append( "Started: " + interval + "\n" );
for( int i = 0; i < 5; i++ ) {
try {
Thread.sleep( interval * 1000 );
} catch( InterruptedException ex ) {
break; // exit immediately if interrupted
}
output.append( "running " + interval + "\n" );
}
latch.countDown();
}
}
From: Lew on
Hole wrote:
> I have a method (issued by an actionPerformed) that instantiate a
> SchedulerTask object (a SwingWorker object) and call execute() method
> on it.
> This execute() method instantiate N SingleTasks and call the execute()
> method on them.
>

By 'SIngleTask' I assume you mean 'BatchTask'.

It struck me that you're invoking a 'SwingWorker' from the
'doinBackground()' method of a 'SwingWorker'. Don't do that. Just
spawn a thread - you're already off the EventDispatchThread (EDT), so
the purpose of 'SwingWorker' is already achieved.

Your 'doInBackground()' method can do a 'Thread#join()' on each sub-
task after having spawned all of them. That way it's guaranteed that
all sub-tasks have completed before the parent ('SwingWorker') task
returns.

--
Lew
From: Lew on
Douwe wrote:
> Where should I start :/. First of all the field
>

Where should I start? The example you provided is rife with
unsynchronized access to shared data.

> ... Try implementing
> something like the following piece of code
>
> private final Object lock = new Object();
> private volatile int countCompleted;
> pricate volatile int activeProcesses;
>

You're controlling these variables with different monitors - sometimes
'lock', sometimes the internal one due to being 'volatile'. I'm not
convinced that the relationship between these variables is reliably
readable.

> public void processCompleted(BatchExtractionItem completedItem) {
>         //System.out.println("Completed "+completedItem);
>
>         synchronized(lock) {
>                 activeProcesses--;
>                 countCompleted++;
>                 lock.notifyAll();       // wake up the main thread
>         }
>
> }
>
> enum State { WAITING, RUN_NEXT };
>
> protected Integer doInBackground() throws Exception {
>
>         List<BatchExtractionItem> queuedItems = new
> ArrayList<BatchExtractionItem>();
>         queuedItems.addAll(items);
>         activeProcesses = 0;
>         countCompleted = 0;
>         boolean keepRunning = true;
>         BatchExtractionItem itemToRun = null;
>
>         while (keep_running) {
>                 switch(state) {

The read of 'state' is not synchronized with the write.

>                         case WAITING : {
>                                 synchronized(lock) {
>                                         if (activeProcesses<MAX_ACTIVE_PROCESSES) {
>                                                 if (queuedItems.isEmpty()) {
>                                                         if (activeProcesses==0) {
>                                                                 keep_running = false;
>                                                                 break;
>                                                         }
>                                                 } else {
>                                                         state = FIND_TO_RUN;
>                                                         break;
>                                                 }
>                                         }
>                                         try {
>                                                 lock.wait(20000l);              // wait for 20 seconds max
> (or a notify from processCompleted) and then check again
>                                         } catch(Exception ignore) {}
>                                 }
>                          } break;
>
>                          case RUN_NEXT : {
>                                 BatchExtractionItem item = queuedItems.removeLast(queuedItems);

You don't synchronize the change to 'queuedItems'.

>                                 if (!item..getStatus().equals(BatchExtractionItem.COMPLETED) && !

or the 'getStatus()' read.

> item.getStatus().equals(BatchExtractionItem.PROCESSING)) {
>                                         item.setStatus(BatchExtractionItem.PROCESSING);
>                                         BatchTask task = new BatchTask(item);
>                                         task.execute();
>                                         activeProcesses++;
>                                 } else {
>                                         // spitt out a warning
>                                         System.err.println("warn: next item was already processing or has
> completed");
>                                         // countCompleted++;   // add this if it should be counted as a
> completed task
>                                 }
>                                 state = WAITING;
>                         } break;
>                 }
>         }
>         // no further checking needed ... all items have finised processing
>         return 0;
>
> }
>
> @Override
> public void actionPerformed(ActionEvent e) {
>         //issued by Timer event
>         int progress = countCompleted/items.size();
>         setProgress(progress);
>         form.getProgressBar().setValue(progress);
>
> }
>

I'm having difficulty reasoning about the synchronization the way
you've written all this. I suspect there are subtle threading bugs
there.

--
Lew