среда, 9 ноября 2011 г.

Разделение выполнения задачи на n потоков

Бывает при решении некоторых задач, таких как загрузка данных с FTP приходится разделять ход выполнения задачи на несколько потоков.
Казалось бы, все просто. Создаем n-е количество потоков (каждый из которых делает часть общей работы) и запускаем их:
public class MainClass {
  public static void main(String[] args) {
    WorkClass work = new WorkClass();
    work.doSomeWork();
  }
}

public class WorkClass {

  public void doSomeWork() {             
    for (int i = 0; i < 5; i++) { 
      Worker worker = new Worker("directory" + i);
      Thread thread = new Thread(worker);
      thread.start();
    }
    
    //here we must wait for execution of all threads

    //doing work on    
  }

  public class Worker implements Runnable {

    private String dir;

    public Worker(String dir) {
      this.dir = dir;
    }
  
    @Override
    public void run() {
      doPieceOfWork();
    }

    private void doPieceOfWork() {
      //doing loading from FTP, depending on the name of directory ...
    }
  }
}
Здесь в качестве примера разделения выполняемой работы на 5 потоков приводится загрузка по FTP. Каждый поток грузит свою папку. Но может возникнуть проблема. Надо подождать, пока выполнятся все запущенные потоки, и только после этого продолжать работу основного потока. Для этого обернем вызов потоков в блок синхронизации.
public void doSomeWork() {
  synchronized (this) {             
    for (int i = 0; i < 5; i++) { 
      Worker worker = new Worker("directory" + i);
      Thread thread = new Thread(worker);
      thread.start();
    }
  }
}
Далее остановим главный поток после запуска второстепенных потоков до тех пор, пока не получим ответ о завершении работы каждого из них. Для этого сделаем цикл, в котором получаем уведомление от одного из потоков и, если количество завершенных второстепенных потоков меньше количества запущенных, продолжаем ожидать ответа от оставшихся:
public void doSomeWork() {
  synchronized (this) { 
    Vector events = new Vector();
            
    for (int i = 0; i < 5; i++) {       
      Worker worker = new Worker(this, "directory" + i, events);
      Thread thread = new Thread(worker);
      thread.start();
    }

    //if all threads are dead, go ahead
    while (true) {
      try {
        this.wait();
      } catch (InterruptedException e) {
        errorLogger.error(e.getMessage(), e);
      }
      if (events.size() == 5) {
        break;
      }
    }        
  }

  //doing work on
}
теперь надо добавить во второстепенные потоки уведомления, которые будет получать основной поток:
public class Worker implements Runnable {

  private String dir;

  private Vector events;

  public Worker(WorkClass monitor, String dir, Vector events) {
    this.dir = dir;
    this.events = events;
  }
  
  @Override
  public void run() {
    doPieceOfWork();
  }

  private void doPieceOfWork() {
    //doing loading from FTP, depending on the name of directory ...
    
    synchronized (monitor) {
      events.add(new Object());
      monitor.notify();      
    }
  }
}
Cледует обратить внимание, что блоки синхронизации во второстепенных потоках и в главном создаются по монитору work класса WorkClass. Также в класс Worker добавлена коллекция events, которая выполняет роль счетчика завершенных потоков. Класс Vector выбран не случайно, тк его методы синхронизированы. В качестве его альтернативы можно воспользоваться Collections.synchronizedList(new ArrayList()). Хотя в данном случае синхронизация методов коллекции и не будет играть никакой роли, тк работа с ней происходит в блоке синхронизации. Все, теперь метод doSomeWork() разделит загрузку по FTP на 5 потоков и продолжит свою работу после завершения всех запущенных потоков. Общий результат выглядит так:
public class MainClass {
  public static void main(String[] args) {
    WorkClass work = new WorkClass();
    work.doSomeWork();
  }
}

public class WorkClass {
  protected static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");

  public void doSomeWork() {
    synchronized (this) {     
      Vector events = new Vector();
        
      for (int i = 0; i < 5; i++) { 
        Worker worker = new Worker(this, "directory" + i, events);
        Thread thread = new Thread(worker);
        thread.start();
      }
      
      //if all threads are dead, go ahead
      while (true) {
        try {
          this.wait();
        } catch (InterruptedException e) {
          errorLogger.error(e.getMessage(), e);
        }
        if (events.size() == 5) {
          break;
        }
      }
    }
    
    //doing work on 
  }

  public class Worker implements Runnable {

    private String dir;

    private Vector events;

    public Worker(WorkClass monitor, String dir, Vector events) {
      this.dir = dir;
      this.events = events;
    }
  
    @Override
    public void run() {
      doPieceOfWork();
    }

    private void doPieceOfWork() {
      //doing loading from FTP, depending on the name of directory ...
    
      synchronized (monitor) {
        events.add(new Object());
        monitor.notify();        
      }
    }
  }
}

Комментариев нет:

Отправить комментарий