#StackBounty: #java #multithreading Process multiple request from different threads in batches on one processing thread

Bounty: 50

So I have a system that uses multiple threads to process data. These data could be processed individualy but it would be better to process them in batches.

Lets assume whe have a class Data, a class OtherData and a class Processor which implements Function<List<Data>, List<OtherData>>.

To process objects of type Data from mutiple threads I designed two classes System and Evaluator.

public class System {
    private final Evaluator evalThread;
    private final Object sync = new Object();
    private Function<List<Data>, List<OtherData>> processor;
    private Map<Object, Data> inputMap;
    private Map<Object, CompletableFuture<OtherData>> futureMap;
    private List<Object> idList;

    public System() {
        processor = new Processor();
        inputMap = new HashMap<>();
        futureMap = new HashMap<>();
        idList = new LinkedList<>();

        evalThread = new Evaluator(processor, inputMap, futureMap, idList, sync);
        Thread thread = new Thread(evalThread, "EvalThread");
        thread.start();
    }

    public CompletableFuture<OtherData> process(Data data) {
        Object id = new Object();

        final CompletableFuture<OtherData> completableFuture = new CompletableFuture<>();

        synchronized (sync) {
            inputMap.put(id, data);
            futureMap.put(id, completableFuture);
            idList.add(id);

            if (idList.size() >= 32) {
                sync.notifyAll();
            }
        }

        return completableFuture;
    }
}

public class Evaluator implements Runnable {
    private final Function<List<Data>, List<OtherData>> processor;
    private final Map<Object, Data> inputMap;
    private final Map<Object, CompletableFuture<OtherData>> futureMap;
    private final List<Object> idList;
    private final Object sync;

    private AtomicBoolean keepRunning = new AtomicBoolean(true);

    public Evaluator(Function<List<Data>, List<OtherData>> processor, Map<Object, Data> inputMap, Map<Object,
                      CompletableFuture<OtherData>> futureMap, List<Object> idList, Object sync) {
        this.processor = processor;
        this.inputMap = inputMap;
        this.futureMap = futureMap;
        this.idList = idList;
        this.sync = sync;
    }

    @Override
    public void run() {
        synchronized (sync) {
            while(keepRunning.get()) {
                if (idList.size() > 0) {
                    List<Data> input = new LinkedList<>();

                    for (int i = 0; i < idList.size(); i++) {
                        input.add(inputMap.get(idList.get(i)));
                    }

                    List<OtherData> output = processor.apply(input);

                    for (int i = 0; i < idList.size(); i++) {
                        futureMap.get(idList.get(i)).complete(output.get(i));
                    }

                    idList.clear();
                    inputMap.clear();
                    futureMap.clear();
                }

                try {
                    sync.wait(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

My idea was that any one can call process with singular data but the data will (if there are enough) processed together with other Data objects.

Any suggestions for improvement or are there systems in the java-framework that would fit this task better?
Do you might see Problems according to deadlocks, etc.?


Get this bounty!!!

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.