Bounty: 50
So I have a system that uses multiple threads to process data. These data could be processed individualy but it would be better to process them in batches.
Lets assume whe have a class Data
, a class OtherData
and a class Processor
which implements Function<List<Data>, List<OtherData>>
.
To process objects of type Data
from mutiple threads I designed two classes System
and Evaluator
.
public class System {
private final Evaluator evalThread;
private final Object sync = new Object();
private Function<List<Data>, List<OtherData>> processor;
private Map<Object, Data> inputMap;
private Map<Object, CompletableFuture<OtherData>> futureMap;
private List<Object> idList;
public System() {
processor = new Processor();
inputMap = new HashMap<>();
futureMap = new HashMap<>();
idList = new LinkedList<>();
evalThread = new Evaluator(processor, inputMap, futureMap, idList, sync);
Thread thread = new Thread(evalThread, "EvalThread");
thread.start();
}
public CompletableFuture<OtherData> process(Data data) {
Object id = new Object();
final CompletableFuture<OtherData> completableFuture = new CompletableFuture<>();
synchronized (sync) {
inputMap.put(id, data);
futureMap.put(id, completableFuture);
idList.add(id);
if (idList.size() >= 32) {
sync.notifyAll();
}
}
return completableFuture;
}
}
public class Evaluator implements Runnable {
private final Function<List<Data>, List<OtherData>> processor;
private final Map<Object, Data> inputMap;
private final Map<Object, CompletableFuture<OtherData>> futureMap;
private final List<Object> idList;
private final Object sync;
private AtomicBoolean keepRunning = new AtomicBoolean(true);
public Evaluator(Function<List<Data>, List<OtherData>> processor, Map<Object, Data> inputMap, Map<Object,
CompletableFuture<OtherData>> futureMap, List<Object> idList, Object sync) {
this.processor = processor;
this.inputMap = inputMap;
this.futureMap = futureMap;
this.idList = idList;
this.sync = sync;
}
@Override
public void run() {
synchronized (sync) {
while(keepRunning.get()) {
if (idList.size() > 0) {
List<Data> input = new LinkedList<>();
for (int i = 0; i < idList.size(); i++) {
input.add(inputMap.get(idList.get(i)));
}
List<OtherData> output = processor.apply(input);
for (int i = 0; i < idList.size(); i++) {
futureMap.get(idList.get(i)).complete(output.get(i));
}
idList.clear();
inputMap.clear();
futureMap.clear();
}
try {
sync.wait(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
My idea was that any one can call process
with singular data but the data will (if there are enough) processed together with other Data
objects.
Any suggestions for improvement or are there systems in the java-framework that would fit this task better?
Do you might see Problems according to deadlocks, etc.?