Казалось бы, все просто. Создаем n-е количество потоков (каждый из которых делает часть общей работы) и запускаем их:
public class MainClass { public static void main(String[] args) { WorkClass work = new WorkClass(); work.doSomeWork(); } } public class WorkClass { public void doSomeWork() { for (int i = 0; i < 5; i++) { Worker worker = new Worker("directory" + i); Thread thread = new Thread(worker); thread.start(); } //here we must wait for execution of all threads //doing work on } public class Worker implements Runnable { private String dir; public Worker(String dir) { this.dir = dir; } @Override public void run() { doPieceOfWork(); } private void doPieceOfWork() { //doing loading from FTP, depending on the name of directory ... } } }Здесь в качестве примера разделения выполняемой работы на 5 потоков приводится загрузка по FTP. Каждый поток грузит свою папку. Но может возникнуть проблема. Надо подождать, пока выполнятся все запущенные потоки, и только после этого продолжать работу основного потока. Для этого обернем вызов потоков в блок синхронизации.
public void doSomeWork() { synchronized (this) { for (int i = 0; i < 5; i++) { Worker worker = new Worker("directory" + i); Thread thread = new Thread(worker); thread.start(); } } }Далее остановим главный поток после запуска второстепенных потоков до тех пор, пока не получим ответ о завершении работы каждого из них. Для этого сделаем цикл, в котором получаем уведомление от одного из потоков и, если количество завершенных второстепенных потоков меньше количества запущенных, продолжаем ожидать ответа от оставшихся:
public void doSomeWork() { synchronized (this) { Vector events = new Vector(); for (int i = 0; i < 5; i++) { Worker worker = new Worker(this, "directory" + i, events); Thread thread = new Thread(worker); thread.start(); } //if all threads are dead, go ahead while (true) { try { this.wait(); } catch (InterruptedException e) { errorLogger.error(e.getMessage(), e); } if (events.size() == 5) { break; } } } //doing work on }теперь надо добавить во второстепенные потоки уведомления, которые будет получать основной поток:
public class Worker implements Runnable { private String dir; private Vector events; public Worker(WorkClass monitor, String dir, Vector events) { this.dir = dir; this.events = events; } @Override public void run() { doPieceOfWork(); } private void doPieceOfWork() { //doing loading from FTP, depending on the name of directory ... synchronized (monitor) { events.add(new Object()); monitor.notify(); } } }Cледует обратить внимание, что блоки синхронизации во второстепенных потоках и в главном создаются по монитору work класса WorkClass. Также в класс Worker добавлена коллекция events, которая выполняет роль счетчика завершенных потоков. Класс Vector выбран не случайно, тк его методы синхронизированы. В качестве его альтернативы можно воспользоваться Collections.synchronizedList(new ArrayList()). Хотя в данном случае синхронизация методов коллекции и не будет играть никакой роли, тк работа с ней происходит в блоке синхронизации. Все, теперь метод doSomeWork() разделит загрузку по FTP на 5 потоков и продолжит свою работу после завершения всех запущенных потоков. Общий результат выглядит так:
public class MainClass { public static void main(String[] args) { WorkClass work = new WorkClass(); work.doSomeWork(); } } public class WorkClass { protected static final Logger errorLogger = LoggerFactory.getLogger("errorLogger"); public void doSomeWork() { synchronized (this) { Vector events = new Vector(); for (int i = 0; i < 5; i++) { Worker worker = new Worker(this, "directory" + i, events); Thread thread = new Thread(worker); thread.start(); } //if all threads are dead, go ahead while (true) { try { this.wait(); } catch (InterruptedException e) { errorLogger.error(e.getMessage(), e); } if (events.size() == 5) { break; } } } //doing work on } public class Worker implements Runnable { private String dir; private Vector events; public Worker(WorkClass monitor, String dir, Vector events) { this.dir = dir; this.events = events; } @Override public void run() { doPieceOfWork(); } private void doPieceOfWork() { //doing loading from FTP, depending on the name of directory ... synchronized (monitor) { events.add(new Object()); monitor.notify(); } } } }
Комментариев нет:
Отправить комментарий