#StackBounty: #python #multithreading #sockets #python-multithreading Python closing socket and connection in a threaded server?

Bounty: 50

We have a Python socket threaded server example. It is a slightly modified version from
https://stackoverflow.com/a/23828265/2008247. The example works and my tests confirm that it performs better than the blocking server.

But in the example, the socket and the connection objects are not closed. Both objects have close() method. (The close method on a connection is called only on Exception. I would expect it to be called for each connection when it ends.) Do we not need to somehow call them? If so, how?

#!/usr/bin/env python

import socket
import threading

class ThreadedServer():

    def __init__(self, host, port):

        self.host = host
        self.port = port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((self.host, self.port))

    def listen(self):

        self.sock.listen(5)

        while True:

            con, address = self.sock.accept()
            con.settimeout(60)
            threading.Thread(target=self.listenToClient,
                             args=(con, address)).start()

    def listenToClient(self, con, address):

        while True:
            try:
                data = con.recv(1024)
                if data:
                    # Set the response to echo back the recieved data
                    response = data
                    con.send(response)
                else:
                    raise Exception('Client disconnected')
            except:
                con.close()
                return False


def main():

    ThreadedServer('', 8001).listen()


if __name__ == "__main__":
    main()


Get this bounty!!!

#StackBounty: #java #multithreading Process multiple request from different threads in batches on one processing thread

Bounty: 50

So I have a system that uses multiple threads to process data. These data could be processed individualy but it would be better to process them in batches.

Lets assume whe have a class Data, a class OtherData and a class Processor which implements Function<List<Data>, List<OtherData>>.

To process objects of type Data from mutiple threads I designed two classes System and Evaluator.

public class System {
    private final Evaluator evalThread;
    private final Object sync = new Object();
    private Function<List<Data>, List<OtherData>> processor;
    private Map<Object, Data> inputMap;
    private Map<Object, CompletableFuture<OtherData>> futureMap;
    private List<Object> idList;

    public System() {
        processor = new Processor();
        inputMap = new HashMap<>();
        futureMap = new HashMap<>();
        idList = new LinkedList<>();

        evalThread = new Evaluator(processor, inputMap, futureMap, idList, sync);
        Thread thread = new Thread(evalThread, "EvalThread");
        thread.start();
    }

    public CompletableFuture<OtherData> process(Data data) {
        Object id = new Object();

        final CompletableFuture<OtherData> completableFuture = new CompletableFuture<>();

        synchronized (sync) {
            inputMap.put(id, data);
            futureMap.put(id, completableFuture);
            idList.add(id);

            if (idList.size() >= 32) {
                sync.notifyAll();
            }
        }

        return completableFuture;
    }
}

public class Evaluator implements Runnable {
    private final Function<List<Data>, List<OtherData>> processor;
    private final Map<Object, Data> inputMap;
    private final Map<Object, CompletableFuture<OtherData>> futureMap;
    private final List<Object> idList;
    private final Object sync;

    private AtomicBoolean keepRunning = new AtomicBoolean(true);

    public Evaluator(Function<List<Data>, List<OtherData>> processor, Map<Object, Data> inputMap, Map<Object,
                      CompletableFuture<OtherData>> futureMap, List<Object> idList, Object sync) {
        this.processor = processor;
        this.inputMap = inputMap;
        this.futureMap = futureMap;
        this.idList = idList;
        this.sync = sync;
    }

    @Override
    public void run() {
        synchronized (sync) {
            while(keepRunning.get()) {
                if (idList.size() > 0) {
                    List<Data> input = new LinkedList<>();

                    for (int i = 0; i < idList.size(); i++) {
                        input.add(inputMap.get(idList.get(i)));
                    }

                    List<OtherData> output = processor.apply(input);

                    for (int i = 0; i < idList.size(); i++) {
                        futureMap.get(idList.get(i)).complete(output.get(i));
                    }

                    idList.clear();
                    inputMap.clear();
                    futureMap.clear();
                }

                try {
                    sync.wait(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

My idea was that any one can call process with singular data but the data will (if there are enough) processed together with other Data objects.

Any suggestions for improvement or are there systems in the java-framework that would fit this task better?
Do you might see Problems according to deadlocks, etc.?


Get this bounty!!!

#StackBounty: #java #spring #multithreading #spring-batch #partitioning Is it possible to partition across single file in spring batch?

Bounty: 50

I have read about partitioning in spring-batch I’ve found example which demonstrates partitioning. The example reads persons from CSV files, does some processing and insert data into database. So at this example 1 partitioning = 1 file and so partitioner implementation looks like this:

public class MultiResourcePartitioner implements Partitioner {

    private final Logger logger = LoggerFactory.getLogger(MultiResourcePartitioner.class);
    public static final String FILE_PATH = "filePath";

    private static final String PARTITION_KEY = "partition";

    private final Collection<Resource> resources;


    public MultiResourcePartitioner(Collection<Resource> resources) {
        this.resources = resources;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        int i = 0;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            context.putString(FILE_PATH, getPath(resource)); //Depends on what logic you want to use to split
            map.put(PARTITION_KEY + i++, context);
        }
        return map;
    }

    private String getPath(Resource resource) {
        try {
            return resource.getFile().getPath();
        } catch (IOException e) {
            logger.warn("Can't get file from from resource {}", resource);
            throw new RuntimeException(e);
        }
    }
}

But what if I have single 10TB file? Does spring batch allow to partition it in some way?

update:

I tried following approach to achieve what I want:

make 2 steps – first step to divide file into pieces and second step to process pieces we got after the first step:

@Configuration
public class SingleFilePartitionedJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ToLowerCasePersonProcessor toLowerCasePersonProcessor;

    @Autowired
    private DbPersonWriter dbPersonWriter;

    @Autowired
    private ResourcePatternResolver resourcePatternResolver;

    @Value("${app.file-to-split}")
    private Resource resource;


    @Bean
    public Job splitFileProcessingJob() throws IOException {
        return jobBuilderFactory.get("splitFileProcessingJob")
                .incrementer(new RunIdIncrementer())
                .flow(splitFileIntoPiecesStep())
                .next(csvToDbLowercaseMasterStep())
                .end()
                .build();
    }

    private Step splitFileIntoPiecesStep() throws IOException {
        return stepBuilderFactory.get("splitFile")
                .tasklet(new FileSplitterTasklet(resource.getFile()))
                .build();
    }

    @Bean
    public Step csvToDbLowercaseMasterStep() throws IOException {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        partitioner.setResources(resourcePatternResolver.getResources("split/*.csv"));
        return stepBuilderFactory.get("csvReaderMasterStep")
                .partitioner("csvReaderMasterStep", partitioner)
                .gridSize(10)
                .step(csvToDataBaseSlaveStep())
                .taskExecutor(jobTaskExecutorSplitted())
                .build();
    }

    @Bean
    public Step csvToDataBaseSlaveStep() throws MalformedURLException {
        return stepBuilderFactory.get("csvToDatabaseStep")
                .<Person, Person>chunk(50)
                .reader(csvPersonReaderSplitted(null))
                .processor(toLowerCasePersonProcessor)
                .writer(dbPersonWriter)
                .build();

    }

    @Bean
    @StepScope
    public FlatFileItemReader csvPersonReaderSplitted(@Value("#{stepExecutionContext[fileName]}") String fileName) throws MalformedURLException {
        return new FlatFileItemReaderBuilder()
                .name("csvPersonReaderSplitted")
                .resource(new UrlResource(fileName))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();

    }

    @Bean
    public TaskExecutor jobTaskExecutorSplitted() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setCorePoolSize(25);
        taskExecutor.setThreadNamePrefix("cust-job-exec2-");
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

}

tasklet:

public class FileSplitterTasklet implements Tasklet {
    private final Logger logger = LoggerFactory.getLogger(FileSplitterTasklet.class);
    private File file;

    public FileSplitterTasklet(File file) {
        this.file = file;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        int count = FileSplitter.splitTextFiles(file, 100);
        logger.info("File was split on {} files", count);
        return RepeatStatus.FINISHED;

    }
}

logic for splitting file:

  public static int splitTextFiles(File bigFile, int maxRows) throws IOException {    
        int fileCount = 1;
        try (BufferedReader reader = Files.newBufferedReader(Paths.get(bigFile.getPath()))) {
            String line = null;
            int lineNum = 1;
            Path splitFile = Paths.get(bigFile.getParent() + "/" + fileCount + "split.txt");
            BufferedWriter writer = Files.newBufferedWriter(splitFile, StandardOpenOption.CREATE);

            while ((line = reader.readLine()) != null) {

                if (lineNum > maxRows) {
                    writer.close();
                    lineNum = 1;
                    fileCount++;
                    splitFile = Paths.get("split/" + fileCount + "split.txt");
                    writer = Files.newBufferedWriter(splitFile, StandardOpenOption.CREATE);
                }

                writer.append(line);
                writer.newLine();
                lineNum++;
            }
            writer.close();
        }

        return fileCount;
    }

So I put all file pieces to the special directory.

But this doesn’t work because on the moment of context initialization folder /split is not exist yet.

update

I’ve generated workaround which works:

public class MultiResourcePartitionerWrapper implements Partitioner {
    private final MultiResourcePartitioner multiResourcePartitioner = new MultiResourcePartitioner();
    private final ResourcePatternResolver resourcePatternResolver;
    private final String pathPattern;

    public MultiResourcePartitionerWrapper(ResourcePatternResolver resourcePatternResolver, String pathPattern) {
        this.resourcePatternResolver = resourcePatternResolver;
        this.pathPattern = pathPattern;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        try {
            Resource[] resources = resourcePatternResolver.getResources(pathPattern);
            multiResourcePartitioner.setResources(resources);
            return multiResourcePartitioner.partition(gridSize);

        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

But it looks ugly. Is it correct solution?


Get this bounty!!!

#StackBounty: #python #multithreading #keras #deep-learning #multiprocessing How can take advantage of multiprocessing and multithreadi…

Bounty: 50

I’d assume that most frameworks like keras/tensorflow/… automatically use all CPU cores but in practice it seems they are not. I just could find few sources which can lead us to use whole capacity of CPU during Deep learning process. I found an article which is written about usage of

from multiprocessing import Pool 
import psutil
import ray 

in another hand, based on this answer for using a keras model in multiple processes there is no track of above-mentioned libraries. Is there the more elegant way to take advantage of Multiprocessing for Keras since it’s very popular for implementation.

  • For instance , how can modify following simple RNN implementation to achieve at least 50% capacity of CPU during learning process?

  • Should I use 2nd model as multitasking like LSTM which I comment bellow? I mean can we simultaneously manage to run multi-models by using more capacity of CPU?

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop

df = pd.read_csv("D:Train.csv", header=None)

index = [i for i in list(range(1440)) if i%3==2]

Y_train= df[index]
df = df.values

#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
    dataX,dataY = [],[]
    print("Len:",len(dataset)-look_back-1)
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back,  :])
    return np.array(dataX), np.array(dataY)

Y_train=np.array(Y_train)
df=np.array(df)

look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)



model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)


#predict

Y_train=np.array(trainY)
Y_test=np.array(testX)

Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)

train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

# create and fit the Simple LSTM model as 2nd model for multi-tasking

#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

#Y_train=np.array(trainY)
#Y_test=np.array(testX)

#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)

#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

Note I don’t access to CUDA just I access powerful server without VGA. My aim is to take advantage of multiprocessing and multithreading for use maximum capacity of CPU instead of 30% it means just one core while I have Quad-core!
Any advice would be greatly appreciated. I have uploaded a formatted csv dataset.


Get this bounty!!!

#StackBounty: #c++ #linux #multithreading #qt #glibc Qt multi-thread application freezes and several threads wait for the same mutex

Bounty: 100

I encountered a strange problem with my Qt-based multi-thread application. After several days running, the application will freeze without any response.

After freeze occurred, I can confirm that several threads, including the main thread, are in futex_wait_queue_me status. When I attach to that application to investigate thread status by GDB, the backtrace of those threads
shows that they all stopped at the following function with the same argument futex=0x45a2f8b8 <main_arena>.

__lll_lock_wait_private (futex=0x45a2f8b8 <main_arena>)

I know that on Linux, using non-asynchronous-safe functions within signal handlers is one of possible reasons for this status, i.e. several threads wait for the same mutex, (I can confirm from backtrace that they all stopped at malloc()/free() related function calls), but after I confirmed my Qt application, I can not find implementations related to Linux signal handlers. (but I am not sure whether Qt core library is using Linux signal handlers in its signal/slot mechanism.)

I am sorry that I can not provide source code for this question because it is a huge project. Would you like tell me some possible reasons for this phenomenon, or some advises on how to debug it?

Thanks in advance.

UPDATE:

I can provide backtrace, but sorry I have to delete some sensible information.

Backtrace of sub thread:

#0 in __lll_lock_wait_private (futex=0x4ad078b8 <main_arena>)
#1 in __GI___libc_malloc (bytes=32) at malloc.c:2918
... ...
#11 in SystemEventImp::event(QEvent*) () 
#12 in QApplicationPrivate::notify_helper(QObject*, QEvent*) ()
#13 in QApplication::notify(QObject*, QEvent*) ()
#14 in QCoreApplication::notifyInternal(QObject*, QEvent*) ()
#15 in QCoreApplicationPrivate::sendPostedEvents(QObject*, int, QThreadData*) ()
#16 in QCoreApplication::sendPostedEvents (receiver=0x0, event_type=0) at kernel/qcoreapplication.cpp:1329
#17 in QWindowSystemInterface::sendWindowSystemEvents (flags=...) at kernel/qwindowsysteminterface.cpp:560
#18 in QUnixEventDispatcherQPA::processEvents (this=0x8079958, flags=...) at eventdispatchers/qunixeventdispatcher.cpp:70
#19 in QEventLoop::processEvents (this=0xbfffef50, flags=...) at kernel/qeventloop.cpp:136
#20 in QEventLoop::exec (this=0xbfffef50, flags=...) at kernel/qeventloop.cpp:212
#21 in QCoreApplication::exec () at kernel/qcoreapplication.cpp:1120
#22 in QGuiApplication::exec () at kernel/qguiapplication.cpp:1220
#23 in QApplication::exec () at kernel/qapplication.cpp:2689
#24 in main(argc=2, argv=0xbffff294)

Backtrace of main thread:

#0 in __lll_lock_wait_private (futex=0x4ad078b8 <main_arena>) at ../ports/sysdeps/unix/sysv/linux/arm/nptl/lowlevellock.c:32
#1 in __GI___libc_malloc (bytes=8) at malloc.c:2918
... ...
#15 in QGraphicsView::paintEvent(QPaintEvent*) ()
#16 in QWidget::event(QEvent*) () 
#17 in QFrame::event(QEvent*) () 
#18 in QGraphicsView::viewportEvent(QEvent*) ()
#19 in Platform::Drawing::GraphicsView::viewportEvent(QEvent*) ()
#20 in QAbstractScrollAreaFilter::eventFilter(QObject*, QEvent*) ()
#21 in QCoreApplicationPrivate::cancel_handler(QObject*, QEvent*) ()
#22 in QApplicationPrivate::notify_helper(QObject*, QEvent*) ()
#23 in QApplication::notify(QObject*, QEvent*) ()
#24 in QCoreApplication::notifyInternal(QObject*, QEvent*) ()
#25 in QWidgetPrivate::drawWidget(QPaintDevice*, QRegion const&, QPoint const&, int, QPainter*, QWidgetBackingStore*) [clone .part.175] () 
#26 in QWidgetBackingStore::sync() ()
#27 in QWidgetPrivate::syncBackingStore() ()
#28 in QWidget::event(QEvent*) ()
#29 in QApplicationPrivate::notify_helper(QObject*, QEvent*) ()
#30 in QApplication::notify(QObject*, QEvent*) ()
#31 in QCoreApplication::notifyInternal(QObject*, QEvent*) ()
#32 in QCoreApplicationPrivate::sendPostedEvents(QObject*, int, QThreadData*) ()
#33 in QCoreApplication::sendPostedEvents (receiver=0x809ea50, event_type=77)
#34 in QGraphicsViewPrivate::dispatchPendingUpdateRequests (this=0x80e4418)
#35 in QGraphicsScenePrivate::_q_processDirtyItems (this=0x80de238) at graphicsview/qgraphicsscene.cpp:508
#36 in QGraphicsScene::qt_static_metacall (_o=0x80d1a80, _c=QMetaObject::InvokeMetaMethod, _id=15, _a=0x865e238)
#37 in QMetaCallEvent::placeMetaCall (this=0x898d020, object=0x80d1a80)
#38 in QObject::event (this=0x80d1a80, e=0x898d020) at kernel/qobject.cpp:1070
#39 in QGraphicsScene::event (this=0x80d1a80, event=0x898d020) at graphicsview/qgraphicsscene.cpp:3478
#40 in QApplicationPrivate::notify_helper (this=0x8077ba0, receiver=0x80d1a80, e=0x898d020) at kernel/qapplication.cpp:3457
#41 in QApplication::notify (this=0x8077970, receiver=0x80d1a80, e=0x898d020) at kernel/qapplication.cpp:2878
#42 in QCoreApplication::notifyInternal (this=0x8077970, receiver=0x80d1a80, event=0x898d020) at kernel/qcoreapplication.cpp:867
#43 in QCoreApplication::sendEvent (receiver=0x80d1a80, event=0x898d020) at ../../include/QtCore/../../src/corelib/kernel/qcoreapplication.h:232
#44 in QCoreApplicationPrivate::sendPostedEvents (receiver=0x0, event_type=0, data=0x8073318) at kernel/qcoreapplication.cpp:1471
#45 in QCoreApplication::sendPostedEvents (receiver=0x0, event_type=0) at kernel/qcoreapplication.cpp:1329
#46 in QWindowSystemInterface::sendWindowSystemEvents (flags=...) at kernel/qwindowsysteminterface.cpp:560
#47 in QUnixEventDispatcherQPA::processEvents (this=0x8079958, flags=...) at eventdispatchers/qunixeventdispatcher.cpp:70
#48 in QEventLoop::processEvents (this=0xbfffef50, flags=...) at kernel/qeventloop.cpp:136
#49 in QEventLoop::exec (this=0xbfffef50, flags=...) at kernel/qeventloop.cpp:212
#50 in QCoreApplication::exec () at kernel/qcoreapplication.cpp:1120
#51 in QGuiApplication::exec () at kernel/qguiapplication.cpp:1220
#52 in QApplication::exec () at kernel/qapplication.cpp:2689
#53 in main(argc=2, argv=0xbffff294)


Get this bounty!!!

#StackBounty: #java #multithreading #error-handling #api #http HTTP multithreaded connections with retry and session update

Bounty: 50

I have a task to make a multithreaded HTTP method. This method is my entry point (as mine is a library) for a bigger project which is a library.

The flow is like this:

ProjectA —> My Entry point Lib —-> HttpLibrary

The main features of this method are:

  • Accept multiple requests
  • Check for HTTP status codes and act appropriately
  • If the session fails, try updating session once and retry main API call

Here is what I have come up with so far:

 private JsonHttpResult execJsonHttpMethod(final int requestId, BaseRequest request, int maxRequestCount) throws BaseApiException, NullPointerException {
        if (maxRequestCount <= 0) {
            maxRequestCount = REQUESTS_ATTEMPTS_COUNT;
        }
        int timeToWaitBeforeNextRequestIncaseError = 500;
        HttpRetryManager httpRetryManager = new HttpRetryManager(maxRequestCount, timeToWaitBeforeNextRequestIncaseError); // Instance created for each thread

        int count = 0; // Only for logging purpose - thread safe | thread local variable
        JsonHttpResult result = null;
        while (httpRetryManager.canRetry()) {
            count++;
            try {
                reloginLock.readLock().lock();
                final HttpMethod httpMethod = compileHttpMethod(request);
                printRequestLog(request, "execJsonHttpMethod[request]: requestId=" + requestId + ", count=" + count + ", url=" + httpMethod.getURI(), false);
                result = mTransportProvider.execJsonHttpMethod(httpMethod);


                boolean isResEmpty = errorIfResponseEmpty(result);
                if (isResEmpty) {
                    try {
                        httpRetryManager.onErrorUpdateCountAndSleep(new HttpStatusException(result.getHttpStatus(), "Unexpected empty response body"));
                    } catch (HttpStatusException responseEmpty) {
                        responseEmpty.printStackTrace();
                        throw responseEmpty;
                    }
                }

                boolean isSessionInvalid = errorIfResponseUnAuthorized(result);
                if (isSessionInvalid) {
                    if (updateSessionIncaseApiGot401()) {
                        try {
                            //In-case session does not update, logout the user by escalating 401 to 403 ( InvalidSessionException() ---> InvalidUserException())
                            httpRetryManager.onErrorUpdateCountAndSleep(new InvalidUserException());
                            continue; //Start next iteration - no need to check for the rest of the conditions
                        } catch (InvalidUserException exceptionSessEsclatedToInvalidUser) {
                            exceptionSessEsclatedToInvalidUser.printStackTrace();
                            throw exceptionSessEsclatedToInvalidUser;
                        }
                    } else {
                        throw new InvalidUserException(); // WILL TRY ONLY ONCE - IF UPDATE FAILS LOGOUT - 403
                    }
                }

                //Logout user incase API threw 403 - Pops up the stack, no retry action required
                if (errorIfResponseForbidden(result)) {
                    throw new InvalidUserException();
                }

                boolean isHttpErrCodeNotOk = errorIfHttpStatusNotOk(result);
                if (!isHttpErrCodeNotOk) {
                    if (result.getHttpStatus() != HttpStatus.SC_OK && result.getHttpStatus() != HttpStatus.SC_CREATED
                            && result.getHttpStatus() != HttpStatus.SC_NO_CONTENT) {
                        if (result.httpError != null) {
                            throw new HttpStatusDetailedException(result.getHttpStatus(), "Unexpected HTTP(S) result: " + result.toString(), result.httpError);
                        } else {
                            throw new HttpStatusException(result.getHttpStatus(), "Unexpected HTTP(S) result: " + result.toString());
                        }
                    }
                }
                //Success
                HttpUUIDController.getInstance().handleSuccessRequest(request);
                break;
            } catch (TransportLevelException ex) {
                printRequestLog(request, "execJsonHttpMethod[error]: request failed, requestId=" + requestId + ", count=" + count + ", ex=" + ex, false);
                try {
                    httpRetryManager.onErrorUpdateCountAndSleep(ex);
                } catch (Exception transportException) {
                    transportException.printStackTrace();
                    throw transportException;
                }
            } catch (URIException ex) {
                printRequestLog(request, "execJsonHttpMethod[error]: request failed, ex=" + ex.getMessage(), false);
                throw new InvalidUrlException();
            } finally {
                printRequestLog(request, "execJsonHttpMethod[result]: finished, requestId=" + requestId + ", count=" + count + ", result=" + result, false);
                reloginLock.readLock().unlock();
            }
        }
        simpleLog("Loop has been exited, Iterations: " + count);
        return result;
    }

These are my helper methods:

 private boolean errorIfResponseEmpty(HttpResult result) {
        if (result.getHttpStatus() == HttpStatus.SC_OK && result.getHttpResponse() == null) {
            printResponseLog(result.getHttpStatus() + " Unexpected empty response body", true);
            return true;
        }
        return false;
    }

    private boolean errorIfResponseUnAuthorized(HttpResult result) {
        if (result.getHttpStatus() == HttpStatus.SC_UNAUTHORIZED) {
            printResponseLog(result.getHttpStatus() + " Session Failed", true);
            return true;
        }
        return false;

    }

    private boolean errorIfHttpStatusNotOk(HttpResult result) {
        if (result.getHttpStatus() != HttpStatus.SC_OK && result.getHttpStatus() != HttpStatus.SC_CREATED && result.getHttpStatus() != HttpStatus.SC_NO_CONTENT) {
            if (result.httpError != null) {
                printResponseLog(result.getHttpStatus() + " Unexpected HTTP(S) result: " + result.toString() + " " + result.httpError, true);
            } else {
                printResponseLog(result.getHttpStatus() + " Unexpected HTTP(S) result: " + result.toString(), true);
            }
            return false;
        }
        return true;
    }


    private boolean errorIfResponseForbidden(HttpResult result) {
        return (result.getHttpStatus() == HttpStatus.SC_FORBIDDEN);

    }

And finally my HttpRetryManager:

import com.api.Constants;
import com.api.http.exception.BaseApiException;

public class HttpRetryManager {

    private int leftRetries;
    private long timeToWait;
    private static final boolean LOG = Constants.LOG;
    private static final String TAG = HttpRetryManager.class.getCanonicalName();

    public HttpRetryManager(int retries, long timeToWait) {
        this.timeToWait = timeToWait;
        leftRetries = retries;
        printDebugLog("Init: HttpRetryManager, this thread will retry " + leftRetries + " times");
    }

    public boolean canRetry() {
        return leftRetries > 0;
    }

    public void onErrorUpdateCountAndSleep(BaseApiException ex) throws BaseApiException {
        leftRetries--;
        printDebugLog("onErrorUpdateCountAndSleep: RetriesLeft: " + leftRetries);
        if (!canRetry()) {
            throw ex;
        }
        waitUntilNextTry();
    }

    private long getTimeToWait() {
        return timeToWait;
    }

    private void waitUntilNextTry() {
        try {
            Thread.sleep(getTimeToWait());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void printDebugLog(String message) {
        if (LOG) {
            ApiLogger.d(TAG, message);
        }
    }
}


Get this bounty!!!

#StackBounty: #python #object-oriented #multithreading #design-patterns #opencv Image capture client – multi-threading + sharing data b…

Bounty: 100

I’m working on a small side project at the moment – like a homemade CCTV system.

This part is my Python Capture Client – it uses OpenCV to capture frames from a connected webcam and sends the frames to a connected server via a socket.

The main thing I was going for was a small application with two services which operate independently once started. One for capturing frames from the camera, and another for sending + receiving network messages. If either of these fail, the other would still work with no issues.

I have more or less achieved this but I’m not certain that I took the best approach – I’m not normally a Python developer so I sort of winged it with this application.

Things I felt especially strange about were the use of queues. From my searching, they seemed to be the best way for sharing data between threads.

The application can be found here – any advice or comments would be appreciated!

This is the main entry point into the application:

main.py

from orchestrator import Orchestrator
from connection_service import ConnectionService
from capture_service import CaptureService

HOST = "127.0.0.1"
PORT = 11000

def main():
    capture_service = CaptureService()
    connection_service = ConnectionService(HOST, PORT)
    orchestrator = Orchestrator(capture_service, connection_service)

    orchestrator.start()

if __name__ == '__main__':
    main()

This is my orchestration service – it coordinates the main loop of retrieving frames + sending to the server:

orchestrator.py

from connection_service import ConnectionService
from capture_service import CaptureService
from not_connected_exception import NotConnectedException

import multiprocessing
import cv2
import time

class Orchestrator():

    def __init__(self, capture_service, connection_service):
        self.manager = multiprocessing.Manager()

        self.connection_service = connection_service
        self.capture_service = capture_service

        self.SEND_FOOTAGE = True   
        self.DETECT_MOTION = False

        self.RUN = True

    # End services
    def finish(self):
        self.RUN = False
        self.connection_service.disconnect()
        self.capture_service.stop_capture()

    # Start services, connect to server / start capturing from camera
    # Grab frames from capture service and display
    # Retrieve any messages from connection service
    # Deal with message e.g stop / start sending frames
    # If send footage is true, encode frame as string and send
    def start(self):
        print ("Starting Orchestration...")

        self.connection_service.connect()
        self.capture_service.start_capture()
        while self.RUN:
            message = None

            #Get camera frames
            frame = self.capture_service.get_current_frame()

            self.display_frame(frame)

            message = self.connection_service.get_message()

            self.handle_message(message)

            #Send footage if requested
            if self.SEND_FOOTAGE and frame is not None: #or (self.DETECT_MOTION and motion_detected):
                try:
                    frame_data = cv2.imencode('.jpg', frame)[1].tostring()
                    self.connection_service.send_message(frame_data)

                except NotConnectedException as e:
                    self.connection_service.connect()

    def handle_message(self, message):
        if message is "SEND_FOOTAGE":
            self.SEND_FOOTAGE = True

        elif message is "STOP_SEND_FOOTAGE":
            self.SEND_FOOTAGE = False

        elif message is "DETECT_MOTION":
            self.DETECT_MOTION = True

        elif message is "STOP_DETECT_MOTION":
            self.DETECT_MOTION = False

    def display_frame(self, frame):
        if frame is not None:
            # Display the resulting frame
            cv2.imshow('orchestrator', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                cv2.destroyAllWindows()
                raise SystemExit("Exiting...")

This is my capturing service – it’s job is to capture frames from the camera and put the frames onto a queue:

capture_service.py

import cv2
import multiprocessing

class CaptureService():

    FRAME_QUEUE_SIZE_LIMIT = 10
    STOP_QUEUE_SIZE_LIMIT = 1
    START_QUEUE_SIZE_LIMIT = 1

    def __init__(self):
        self.frame = None
        manager = multiprocessing.Manager()

        # The queue to add frames to
        self.frame_queue = manager.Queue(self.FRAME_QUEUE_SIZE_LIMIT)

        # A queue to indicate capturing should be stopped
        self.stop_queue = manager.Queue(self.STOP_QUEUE_SIZE_LIMIT)

        # A queue to indicate that capturing should be started
        self.start_queue = manager.Queue(self.START_QUEUE_SIZE_LIMIT)

    # Start Capture
    # Empty the stop queue. If the start queue is empty - start a new capture thread
    # If start queue is not empty, service has already been started
    def start_capture(self):
        print ("Starting capture...")
        while not self.stop_queue.empty():
            self.stop_queue.get()

        if self.start_queue.empty():
            self.capture_thread = multiprocessing.Process(target=self.capture_frames)
            self.capture_thread.start()
            self.start_queue.put("")
            print ("Capturing started...")
        else:
            print ("Capture already started...")

    # Is Capturing
    # Return true if start queue has a value
    def is_capturing(self):
        return not self.start_queue.empty()

    # Get Current Frame
    # Return the current frame from the frame queue
    def get_current_frame(self):
        if not self.frame_queue.empty():
            return self.frame_queue.get()

        return None

    # Stop Capture
    # Add a message to the stop queue
    # Empty the start queue
    def stop_capture(self):
        if self.stop_queue.empty():
            self.stop_queue.put("")

        while not self.start_queue.empty():
            self.start_queue.get()

    # Capture Frames
    # Captures frames from the device at 0
    # Only add frames to queue if there's space
    def capture_frames(self):
        cap = None
        try:
            cap = cv2.VideoCapture(0)
            while True:
                #Empty Start / Stop queue signals
                if not self.stop_queue.empty():
                    while not self.stop_queue.empty():
                        self.stop_queue.get()
                    while not self.start_queue.empty():
                        self.start_queue.get()
                    break;

                ret, frame = cap.read()

                if self.frame_queue.qsize() > self.FRAME_QUEUE_SIZE_LIMIT or self.frame_queue.full():
                    continue

                self.frame_queue.put(frame)

            # When everything done, release the capture
            cap.release()
            cv2.destroyAllWindows()

        except Exception as e:
            print ("Exception capturing images, stopping...")
            self.stop_capture()
            cv2.destroyAllWindows()
            if cap is not None:
                cap.release()

This is my connection service, it takes care of all network related comms.

connection_service.py

from send_message_exception import SendMessageException
from not_connected_exception import NotConnectedException

import socket
import time
import multiprocessing
import struct

class ConnectionService():

    MAX_QUEUE_SIZE = 1

    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.socket = None

        manager = multiprocessing.Manager()

        # The queue to put messages to send on
        self.send_message_queue = manager.Queue(self.MAX_QUEUE_SIZE)

        # The queue received messages go onto
        self.receive_message_queue = manager.Queue(self.MAX_QUEUE_SIZE)

        # A queue which indicates if the service is connected or not
        self.is_connected_queue = manager.Queue(self.MAX_QUEUE_SIZE)

        # A queue which indicateds if the service is trying to connect
        self.pending_connection_queue = manager.Queue(self.MAX_QUEUE_SIZE)

        # A queue to stop sending activity
        self.stop_send_queue = manager.Queue(self.MAX_QUEUE_SIZE)

        # A queue to stop receiving activity
        self.stop_receive_queue = manager.Queue(self.MAX_QUEUE_SIZE)

    # Connect to the server
    # 1) If already connected - return
    # 2) If pending connection - return
    # 3) Start the network thread - don't return until the connection status is pending            
    def connect(self):
        if self.is_connected():
            return
        elif not self.pending_connection_queue.empty():
            return
        else:
            self.network_thread = multiprocessing.Process(target=self.start_network_comms)
            self.network_thread.start()

            #Give thread time to sort out queue
            while self.pending_connection_queue.empty():
                continue

    # Start network communications
    # Mark connection status as pending via queue. Clear stop queues.
    # Get socket for connection, mark as connected via queue.
    # Start Send + Receive message queues with socket as argument
    def start_network_comms(self):
            self.pending_connection_queue.put("CONNECTING")
            self.clear_queue(self.stop_send_queue)
            self.clear_queue(self.stop_receive_queue)

            self.socket = self.connect_to_server(self.host, self.port)

            self.is_connected_queue.put("CONNECTED")
            self.pending_connection_queue.get()

            print ("Connected to server...")

            receive_message_thread = multiprocessing.Process(target=self.receive_message, args=(self.socket,))
            receive_message_thread.start()

            send_message_thread = multiprocessing.Process(target=self.send_message_to_server, args=(self.socket,))
            send_message_thread.start()

    # Return true if connected queue has a value
    def is_connected(self):
        return not self.is_connected_queue.empty()

    # Put message on stop queues to end send / receive threads
    # Clear connected state queues
    def disconnect(self):
        print ("Disconnecting...")

        self.stop_receive_queue.put("")
        self.stop_send_queue.put("")

        self.clear_queue(self.pending_connection_queue)
        self.clear_queue(self.is_connected_queue)

        print ("Connection closed")

    # Send a message
    # If connected and send queue isn't full - add message to send queue
    # Raise exception if not connected
    def send_message(self, message):
        if self.is_connected():
            if self.send_message_queue.full():
                print ("Send message queue full...")
                return
            self.send_message_queue.put(message)
        else:
            raise NotConnectedException("Not connected to server...")

    # Send message to server
    # If send queue isn't empty, send the message length + message (expects binary data) to server
    # If exception while sending and the stop queue isn't empty - disconnect
    def send_message_to_server(self, socket):
        while self.stop_send_queue.empty():
            while not self.send_message_queue.empty():
                print ("Message found on queue...")
                try:
                    message = self.send_message_queue.get()
                    message_size = len(message)
                    print (f"Message: {message_size}")
                    socket.sendall(struct.pack(">L", message_size) + message)
                except Exception as e:
                    if not self.stop_send_queue.empty():
                        return
                    print (f"nException sending message:nn{e}")
                    self.disconnect()

    # Get a message
    # If the receive queue isn't empty - return a message
    def get_message(self):
        if not self.receive_message_queue.empty():
            return self.receive_message_queue.get()

        return None

    # Receive messages from socket
    # Read data from socket according to the pre-pended message length
    def receive_message(self, socket):
        data = b""
        payload_size = struct.calcsize(">L")

        print ("Listening for messages...")
        while self.stop_receive_queue.empty():
            #Get message size
            try:
                while len(data) < payload_size:
                    data += socket.recv(4096)

                packed_msg_size = data[:payload_size]
                data = data[payload_size:]
                msg_size = struct.unpack(">L", packed_msg_size)[0]

                print ("Received message size:")
                print (msg_size)

                #Get message
                while len(data) < msg_size:
                    data += socket.recv(4096) 

                message = data[:msg_size]       
                data = data[msg_size:]   

                print (message)

                if self.receive_message_queue.qsize() >= self.MAX_QUEUE_SIZE or self.receive_message_queue.full():
                    continue

                self.receive_message_queue.put(message)

            except Exception as e:
                print (f"nException while receiving messages: {e}nn")
                break

        print ("nDisconnecting...nn")
        self.disconnect()

    # Connect to the server
    def connect_to_server(self, host, port, wait_time=1):
        try:
            client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client_socket.connect((host, port))
            return client_socket
        except Exception:
            print (f"Couldn't connect to remote address, waiting {wait_time} seconds to retry")
            time.sleep(wait_time)
            return self.connect_to_server(host, port, wait_time * 1)

    # Clear messages from the supplied queue (should live somewhere else)
    def clear_queue(self, queue):
        while not queue.empty():
            queue.get()

not_connected_exception.py

class NotConnectedException(Exception):
    def __init__(self, message):
        super().__init__(message)

And a small test server just to test receiving messages..

test_server.py

import socket
import sys
import struct

HOST = "127.0.0.1"
PORT = 11000

def main():
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    print('Socket created')
    s.bind((HOST,PORT))
    print('Socket bind complete')

    while True:
        s.listen(10)
        try:
            print('Socket now listening')

            conn,addr=s.accept()

            data = b""
            payload_size = struct.calcsize(">L")
            print("payload_size: {}".format(payload_size))
            while True:
                while len(data) < payload_size:
                    data += conn.recv(4096)

                packed_msg_size = data[:payload_size]
                data = data[payload_size:]
                msg_size = struct.unpack(">L", packed_msg_size)[0]
                print("msg_size: {}".format(msg_size))
                while len(data) < msg_size:
                    data += conn.recv(4096)
                frame_data = data[:msg_size]
                data = data[msg_size:]

        except Exception as e:
            print("Whoops...")
            print (e)

if __name__ == '__main__':
    main()


Get this bounty!!!