Skip to content
Snippets Groups Projects

Add test implementation of jobs/stages/tasks

Open Jonathan Mace requested to merge mace_countdownlatch into master
All threads resolved!
1 file
+ 330
0
Compare changes
  • Side-by-side
  • Inline
package testcases;
import boundarydetection.tracker.AccessTracker;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TestJobExecutor {
@Test
public void testSimpleJob() throws InterruptedException {
int numThreads = 1;
int taskSleepDuration = 20;
Executor e = new Executor(numThreads);
JobBuilder b = new JobBuilder();
b.appendStage(1, taskSleepDuration);
Job j = b.build(numThreads);
j.execute(e);
try {
j.awaitCompletion();
} finally {
e.shutdown();
}
e.join();
}
@Test
public void testMultipleSimpleJobsSingleThreaded() throws InterruptedException {
int numThreads = 1;
int numJobs = 10;
int taskSleepDuration = 20;
Executor e = new Executor(numThreads);
List<Job> jobs = new ArrayList<Job>();
for (int i = 0; i < numJobs; i++) {
JobBuilder b = new JobBuilder();
b.appendStage(1, taskSleepDuration);
jobs.add(b.build(numThreads));
}
for (Job j : jobs) {
j.execute(e);
}
try {
for (Job j : jobs) {
j.awaitCompletion();
}
} finally {
e.shutdown();
}
e.join();
}
@Test
public void testMultipleDependenciesWithinJob() throws InterruptedException {
int numThreads = 2;
int taskSleepDuration = 20;
Executor e = new Executor(numThreads);
JobBuilder b = new JobBuilder();
b.appendStage(1, taskSleepDuration);
b.appendStage(2, taskSleepDuration);
b.appendStage(1, taskSleepDuration);
Job j = b.build(numThreads);
j.execute(e);
try {
j.awaitCompletion();
} finally {
e.shutdown();
}
e.join();
}
@Test
public void testMultipleDependenciesWithinJobBigger() throws InterruptedException {
int numThreads = 2;
int taskSleepDuration = 20;
Executor e = new Executor(numThreads);
JobBuilder b = new JobBuilder();
b.appendStage(5, taskSleepDuration);
b.appendStage(20, taskSleepDuration);
b.appendStage(5, taskSleepDuration);
Job j = b.build(numThreads);
j.execute(e);
try {
j.awaitCompletion();
} finally {
e.shutdown();
}
e.join();
}
@Test
public void testMultipleBigJobsWithMultipleDependenciesMultiThreaded() throws InterruptedException {
int numThreads = 3;
int numJobs = 10;
int taskSleepDuration = 20;
Executor e = new Executor(numThreads);
List<Job> jobs = new ArrayList<Job>();
for (int i = 0; i < numJobs; i++) {
JobBuilder b = new JobBuilder();
b.appendStage(7, taskSleepDuration);
b.appendStage(25, taskSleepDuration);
b.appendStage(7, taskSleepDuration);
jobs.add(b.build(numThreads));
}
for (Job j : jobs) {
j.execute(e);
}
try {
for (Job j : jobs) {
j.awaitCompletion();
}
} finally {
e.shutdown();
}
e.join();
}
public static class Executor {
private volatile boolean alive = true;
public final int numthreads;
private Collection<ExecutorThread> threads = new ArrayList<>();
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
public Executor(int numthreads) {
this.numthreads = numthreads;
for (int i = 0; i < numthreads; i++) {
threads.add(new ExecutorThread());
}
for (ExecutorThread t : threads) {
t.start();
}
}
public void submit(Runnable r) {
queue.add(r);
}
public void shutdown() {
alive = false;
for (ExecutorThread thread : threads) {
thread.interrupt();
}
}
public void join() throws InterruptedException {
for (ExecutorThread thread : threads) {
thread.join();
}
}
private class ExecutorThread extends Thread {
public void run() {
Runnable r;
try {
while (alive) {
r = queue.take();
r.run();
}
} catch (InterruptedException e) {
}
}
}
}
public class Stage {
private final int maxConcurrency;
private BlockingQueue<Runnable> tasks;
private Collection<Stage> nextStages;
private AtomicInteger tasksRemaining;
public Stage(Collection<Runnable> tasks, int maxConcurrency) {
this(tasks, new ArrayList<>(), maxConcurrency);
}
public Stage(Collection<Runnable> tasks, Collection<Stage> nextStages, int maxConcurrency) {
this.tasks = new LinkedBlockingQueue<>(tasks);
this.nextStages = new ArrayList<Stage>(nextStages);
this.tasksRemaining = new AtomicInteger(tasks.size());
this.maxConcurrency = maxConcurrency;
}
public void execute(Executor e) {
for (int i = 0; i < maxConcurrency; i++) {
if (!submitTask(e)) break;
}
}
boolean submitTask(Executor e) {
Runnable r = tasks.poll();
if (r != null) {
e.submit(new Task(r, e));
return true;
}
return false;
}
void taskCompleted(Executor e) {
submitTask(e);
if (tasksRemaining.decrementAndGet() == 0) {
stageCompleted(e);
}
}
void stageCompleted(Executor e) {
for (Stage stage : nextStages) {
stage.execute(e);
}
}
public void awaitCompletion() throws InterruptedException {
while (tasksRemaining.get() > 0) {
Thread.sleep(100);
}
}
private class Task implements Runnable {
private final Executor e;
private final Runnable r;
public Task(Runnable r, Executor e) {
this.r = r;
this.e = e;
}
public void run() {
r.run();
taskCompleted(e);
}
}
}
public static class Job {
private final List<Stage> stages;
public Job(List<Stage> stages) {
this.stages = stages;
}
public void execute(Executor e) {
AccessTracker.startTask();
stages.get(0).execute(e);
AccessTracker.stopTask();
}
public void awaitCompletion() throws InterruptedException {
stages.get(stages.size()-1).awaitCompletion();
}
}
public class JobBuilder {
private List<Collection<Runnable>> tasks = new ArrayList<>();
public void appendStage(int numTasks, int taskSleepDuration) {
Collection<Runnable> stageTasks = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
stageTasks.add(new SleepRunnable(taskSleepDuration));
}
tasks.add(stageTasks);
}
public Job build(int maxConcurrency) {
List<Stage> allStages = new ArrayList<>();
Collection<Stage> nextStages = new ArrayList<>();
for (int i = tasks.size(); i > 0; i--) {
Collection<Runnable> stageTasks = this.tasks.get(i-1);
Stage s = new Stage(stageTasks, nextStages, maxConcurrency);
nextStages = new ArrayList<>();
nextStages.add(s);
allStages.add(0, s);
}
return new Job(allStages);
}
}
public static class SleepRunnable implements Runnable {
private final int millis;
public SleepRunnable(int millis) {
this.millis = millis;
}
public void run() {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Loading