#StackBounty: #python #multithreading #core #jupyter-lab Control number of CPU using in jupyterlab server

Bounty: 50

I’m using jupyterlab and I know that I have 12 cores available.
At the moment I use only 1 and I would like to use more.
I have tried to changed the number I use by write this in the terminal:

export JULIA_NUM_THREADS=7

but then when I print:

import threading
threading.activeCount()
>>>5

how can I make more CPU available for my jupyterlab notebook?
This is really not my field so I’m sorry if is smething really simple I just don’t understand what am I doing wrong and where to start from.


Get this bounty!!!

#StackBounty: #python #multithreading #optimization #parallel-processing #multiprocessing Multiprocessing in django and Python code

Bounty: 50

I am trying to implement multiprocessing in my application.

The scenario is :
From GUI, when i click "Run" button control comes to a python function(which is not a main function).

Now in this function I am running loop and reading/executing multiple file one at a time. I want this to happen in parallel.

But as multiprocessing.process() need __name__ ='__main__', my function mentioned in "target = function name" in multiprocessing() is not being invoked.

How can I make it happen. If multiprocessing seems wrong way then any alternative way to improve code performance?

Adding Sample code(please note that this is just a psudo code where i have added high level code to understand the flow, please excuse any syntax error) :

urls.py file:

from django.urls import path
from textapp import views

urlpatterns = [
    path('execute/',views.functiontomultiprocess),
    ...
    other urls
    ]

views.py:

def functiontomultiprocess(request):
for doc in alldocs:
   p = multiprocess.Process(function2)
   p.start()
   p.join()


Get this bounty!!!

#StackBounty: #ios #swift #multithreading #concurrency Safety of using an empty reference instance across multiple threads

Bounty: 150

Background

I have a class Data that stores multiple input parameters and a single output value.

The output value is recalculated whenever one of the input parameters is mutated.

The calculation takes a non-trivial amount of time so it is performed asynchronously.

If one of the input parameters changes during recalculation, the current calculation is cancelled, and a new one is begun.

The cancellation logic is implemented via a serialized queue of calculation operations and a key (reference instance) (Data.key). Data.key is set to a new reference instance every time a new recalculation is added to the queue. Also only a single recalculation can occur at a time — due to the queue. Any executing recalculation constantly checks if it was the most recently initiated calculation by holding a reference to both the key that what was created with it when it was initiated and the currently existing key. If they are different, then a new recalculation has been queued since it began, and it will terminate.

This will trigger the next recalculation in the queue to begin, repeating the process.

The basis for my question

The reassignment of Data.key is done on the main thread.

The current calculation constantly checks to see if its key is the same as the current one. This means another thread is constantly accessing Data.key.

Question(s)

Is it safe for me to leave Data.key vulnerable to being read/written to at the same time?

Is it even possible for a property to be read and written to simultaneously?


Get this bounty!!!

#StackBounty: #python #linux #multithreading How Can I Update a String's Value without Changing Its Position with a Thread Job? [Py…

Bounty: 50

I will have two jobs in my script. Once a job started other will run async. I used Thread for this. And this thread will return some info while other count that info.

What I want to do is while counter’s value is changing, thread also be continue to run.

Display that I want:

-----------------------------------------
Count: 5
-----------------------------------------
thread keeps running...
thread keeps running...
thread keeps running...

Actually I achive this goal using curses module, but this is not exactly what I wanted. Because when I press ^C terminal contents is gone. I want them to freeze in the screen.

Code with curses:

import sys
import time
import queue
import signal
import curses
import threading


def ctrl_c_handler(*args):
    sys.exit(0)


signal.signal(signal.SIGINT, ctrl_c_handler)

MESSAGE = "thread keeps running..."


def print_func(message):
    return message


def new_window(stdscr):
    que = queue.Queue()

    curses.curs_set(False)

    y, x = stdscr.getmaxyx()

    draw = x * "-"

    i = 3
    count = 1
    while True:
        thread = threading.Thread(target=lambda q, arg1: q.put(print_func(arg1)), args=(que, MESSAGE,), daemon=True)
        thread.start()
        result = que.get()

        try:
            stdscr.addstr(0, 0, draw)
            stdscr.addstr(1, 0, f"Count: {str(count)}")
            stdscr.addstr(2, 0, draw)
            stdscr.addstr(i, 0, result)

        except curses.error:
            pass

        stdscr.refresh()
        time.sleep(0.1)

        i += 1
        count += 1

        if i == y:
            stdscr.clear()
            i = 3


curses.wrapper(new_window)

Is there a way to achive the same goal without using curses, or curses with no loss of contents?

Thank you!


Get this bounty!!!

#StackBounty: #multithreading #asynchronous #rust #rust-tokio Multithreaded Client that sends data in a queue and stores data in anothe…

Bounty: 50

I’m having difficulties in making a Tokio client that receives packets from a server and stores them in a queue for the main thread to process, while being able to send packets to the server from another queue at the same time.

I’m trying to make a very simple online game demonstration, having a game client that Sends data (it’s own modified states, like player movement) and receives data (Game states modified by other players & server, like an NPC/other players that also moved).

The idea is to have a network thread that accesses two Arcs holding Mutexes to Vec<bytes::Bytes> that store serialized data. One Arc is for IncomingPackets, and the other for OutgoingPackets. IncomingPackets would be filled by packets sent from the server to the client that would be later read by the main thread, and OutgoingPackets would be filled by the main thread with packets that should be sent to the server.

I can’t seem to receive or send packets in another thread.

The client would only connect to the server, and the server would allow many clients (which would be served individually).

The explanations around stream’s usage and implementation are not newbie-friendly, but I think I should be using them somehow.

I wrote some code, but it does not work and is probably wrong.

(My original code does not compile, so treat this as pseudocode, sorry)

Playground

extern crate byteorder; // 1.3.4
extern crate futures; // 0.3.5
extern crate tokio; // 0.2.21 

use bytes::Bytes;
use futures::future;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;

use byteorder::{BigEndian, WriteBytesExt};
use std::io;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};

//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
    data: Mutex<Vec<bytes::Bytes>>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
    let (mut r, mut w) = stream.split();

    let mut inc: Vec<bytes::Bytes> = Vec::new();
    inc.push(Bytes::from("Wow"));

    let mut incoming_packets = Arc::new(SharedPackets {
        data: Mutex::new(inc),
    });

    let mut outg: Vec<bytes::Bytes> = Vec::new();
    outg.push(Bytes::from("Wow"));
    let mut outgoint_packets = Arc::new(SharedPackets {
        data: Mutex::new(outg),
    });

    let mut local_incoming_packets = Arc::clone(&incoming_packets);
    let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
    let mut rarc = Arc::new(Mutex::new(r));
    let mut warc = Arc::new(Mutex::new(w));

    tokio::spawn(async move {
        //send and receive are both async functions that contain an infinite loop
        //they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
        //send reads the queue and write this data on the socket
        //recv reads the socket and write this data on the queue
        //both "queues" are manipulated by the main thread
        let mut read = &*rarc.lock().unwrap();
        let mut write = &*warc.lock().unwrap();

        future::try_join(
            send(&mut write, &mut local_outgoint_packets),
            recv(&mut read, &mut local_incoming_packets),
        )
        .await;
    });

    loop {
        //read & write other stuff on both incoming_packets & outgoint_packets
        //until the end of the program
    }
}

async fn recv(reader: &mut ReadHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
    loop {
        let mut buf: Vec<u8> = vec![0; 4096];

        let n = match reader.read(&mut buf).await {
            Ok(n) if n == 0 => return Ok(()),
            Ok(n) => n,
            Err(e) => {
                eprintln!("failed to read from socket; err = {:?}", e);
                return Err(e);
            }
        };
    }
}

async fn send(writer: &mut WriteHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
    loop {
        //task::sleep(Duration::from_millis(300)).await;
        {
            let a = vec!["AAAA"];
            for i in a.iter() {
                let mut byte_array = vec![];
                let str_bytes = i.as_bytes();
                WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32)
                    .unwrap();
                byte_array.extend(str_bytes);

                writer.write(&byte_array).await?;
            }
        }
    }
}

This does not compile:

error: future cannot be sent between threads safely
   --> src/main.rs:46:5
    |
46  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:55:9
    |
52  |           let mut read = &*rarc.lock().unwrap();
    |                            -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>` which is not `Send`
...
55  | /         future::try_join(
56  | |             send(&mut write, &mut local_outgoint_packets),
57  | |             recv(&mut read, &mut local_incoming_packets),
58  | |         )
59  | |         .await;
    | |______________^ await occurs here, with `rarc.lock().unwrap()` maybe used later
60  |       });
    |       - `rarc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:52:25
    |
52  |         let mut read = &*rarc.lock().unwrap();
    |                         ^^^^^^^^^^^^^^^^^^^^^

error: future cannot be sent between threads safely
   --> src/main.rs:46:5
    |
46  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:55:9
    |
53  |           let mut write = &*warc.lock().unwrap();
    |                             -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>` which is not `Send`
54  | 
55  | /         future::try_join(
56  | |             send(&mut write, &mut local_outgoint_packets),
57  | |             recv(&mut read, &mut local_incoming_packets),
58  | |         )
59  | |         .await;
    | |______________^ await occurs here, with `warc.lock().unwrap()` maybe used later
60  |       });
    |       - `warc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
   --> src/main.rs:53:26
    |
53  |         let mut write = &*warc.lock().unwrap();
    |                          ^^^^^^^^^^^^^^^^^^^^^

I think this is the least of the problems, because I’m really new with tokio.

I could not find an example of this, do you know any performant approach to this problem?


Get this bounty!!!

#StackBounty: #python #python-3.x #multithreading Invoking parallel threads of a nslookup command in python

Bounty: 50

I’ve written below python script just to invoke parallel threads of a command nslookup linux-host01 dns-server, i’m Just trying to flood the DNS Server dns-server with multiple number of queries in parallel way.

As an argument to this script i’m passing a hostlist file example linux-host01 , linux-host02 etc.

To achieve this i’m using threading module with the below code and this works fine, However, i’m using the the threading first time hence would like to know if this can be simplified of enhanced further.

import os
import threading
from sys import argv, exit


lock = threading.Lock()

hostfile = argv[1]

lst = []
with open(hostfile, 'r') as frb:
    for line in frb:
        lst.append(line.strip("n"))

threads = []
thread_count = len(lst)

def pop_queue():
    host = None
    lock.acquire()
    if lst:
        host = lst.pop()
        lock.release()
    return host

def dequeue():
    while len(lst) != 0:
        host = pop_queue()
        if not host:
            return None
        else:
            dolookup(host)

def dolookup(name):
    while True:
        os.system("/usr/bin/nslookup %s dns-test01" %name)

for i in range(thread_count):
    t = threading.Thread(target=dequeue)
    t.start()
    print "started thread %s" %i
    threads.append(t)

[t.join() for t in threads]

Appreciate any help on this.


Get this bounty!!!

#StackBounty: #python #django #multithreading #apache #mod-wsgi Django's AppConfig.ready() seems to be called once per thread in Ap…

Bounty: 50

I have the following code in apps.py:

class Pqawv1Config(AppConfig):
    # ...
    def ready(self):
        env_run_main=os.environ.get('RUN_MAIN')
        print('IS_PRODUCTION=%s, RUN_MAIN=%s' % (settings.IS_PRODUCTION, env_run_main))
        if (not settings.IS_PRODUCTION) and (env_run_main != 'true'):
            print('Exiting because detected running in reloader.')
            return

        print('Starting up for PID=%s, argv=%s...' % (os.getpid(), sys.argv))
        # Initialization of a singleton object from a C++ DLL
        # Raise an exception if this library was already initialized in this process

In the evening I restarted the server and it printed the following in the log as expected:

[Sun Sep 15 22:50:34.928549 2019] [wsgi:error] [pid 11792:tid 1176] Starting up for PID=11792, argv=['mod_wsgi']...r

However, in the morning I noticed that something strange has happened. It looks like Apache started a new thread for the web application:

[Mon Sep 16 04:10:41.224464 2019] [wsgi:error] [pid 11792:tid 1160] Starting up for PID=11792, argv=['mod_wsgi']...r

And later:

[Mon Sep 16 07:16:21.028429 2019] [mpm_winnt:error] [pid 11792:tid 2272] AH00326: Server ran out of threads to serve requests. Consider raising the ThreadsPerChild setting

I think it’s not the issue of calling AppConfig.ready() method twice because there were requests to the website in between and they were handled well. It rather looks like Django’s AppConfig.ready() method is called once per worker thread of Apache process. Is this so? In this case, how to run the code once per process, rather than once per thread in Django powered by Apache and mod-wsgi?


Get this bounty!!!

#StackBounty: #python #multithreading #sockets #python-multithreading Python closing socket and connection in a threaded server?

Bounty: 50

We have a Python socket threaded server example. It is a slightly modified version from
https://stackoverflow.com/a/23828265/2008247. The example works and my tests confirm that it performs better than the blocking server.

But in the example, the socket and the connection objects are not closed. Both objects have close() method. (The close method on a connection is called only on Exception. I would expect it to be called for each connection when it ends.) Do we not need to somehow call them? If so, how?

#!/usr/bin/env python

import socket
import threading

class ThreadedServer():

    def __init__(self, host, port):

        self.host = host
        self.port = port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind((self.host, self.port))

    def listen(self):

        self.sock.listen(5)

        while True:

            con, address = self.sock.accept()
            con.settimeout(60)
            threading.Thread(target=self.listenToClient,
                             args=(con, address)).start()

    def listenToClient(self, con, address):

        while True:
            try:
                data = con.recv(1024)
                if data:
                    # Set the response to echo back the recieved data
                    response = data
                    con.send(response)
                else:
                    raise Exception('Client disconnected')
            except:
                con.close()
                return False


def main():

    ThreadedServer('', 8001).listen()


if __name__ == "__main__":
    main()


Get this bounty!!!

#StackBounty: #java #multithreading Process multiple request from different threads in batches on one processing thread

Bounty: 50

So I have a system that uses multiple threads to process data. These data could be processed individualy but it would be better to process them in batches.

Lets assume whe have a class Data, a class OtherData and a class Processor which implements Function<List<Data>, List<OtherData>>.

To process objects of type Data from mutiple threads I designed two classes System and Evaluator.

public class System {
    private final Evaluator evalThread;
    private final Object sync = new Object();
    private Function<List<Data>, List<OtherData>> processor;
    private Map<Object, Data> inputMap;
    private Map<Object, CompletableFuture<OtherData>> futureMap;
    private List<Object> idList;

    public System() {
        processor = new Processor();
        inputMap = new HashMap<>();
        futureMap = new HashMap<>();
        idList = new LinkedList<>();

        evalThread = new Evaluator(processor, inputMap, futureMap, idList, sync);
        Thread thread = new Thread(evalThread, "EvalThread");
        thread.start();
    }

    public CompletableFuture<OtherData> process(Data data) {
        Object id = new Object();

        final CompletableFuture<OtherData> completableFuture = new CompletableFuture<>();

        synchronized (sync) {
            inputMap.put(id, data);
            futureMap.put(id, completableFuture);
            idList.add(id);

            if (idList.size() >= 32) {
                sync.notifyAll();
            }
        }

        return completableFuture;
    }
}

public class Evaluator implements Runnable {
    private final Function<List<Data>, List<OtherData>> processor;
    private final Map<Object, Data> inputMap;
    private final Map<Object, CompletableFuture<OtherData>> futureMap;
    private final List<Object> idList;
    private final Object sync;

    private AtomicBoolean keepRunning = new AtomicBoolean(true);

    public Evaluator(Function<List<Data>, List<OtherData>> processor, Map<Object, Data> inputMap, Map<Object,
                      CompletableFuture<OtherData>> futureMap, List<Object> idList, Object sync) {
        this.processor = processor;
        this.inputMap = inputMap;
        this.futureMap = futureMap;
        this.idList = idList;
        this.sync = sync;
    }

    @Override
    public void run() {
        synchronized (sync) {
            while(keepRunning.get()) {
                if (idList.size() > 0) {
                    List<Data> input = new LinkedList<>();

                    for (int i = 0; i < idList.size(); i++) {
                        input.add(inputMap.get(idList.get(i)));
                    }

                    List<OtherData> output = processor.apply(input);

                    for (int i = 0; i < idList.size(); i++) {
                        futureMap.get(idList.get(i)).complete(output.get(i));
                    }

                    idList.clear();
                    inputMap.clear();
                    futureMap.clear();
                }

                try {
                    sync.wait(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

My idea was that any one can call process with singular data but the data will (if there are enough) processed together with other Data objects.

Any suggestions for improvement or are there systems in the java-framework that would fit this task better?
Do you might see Problems according to deadlocks, etc.?


Get this bounty!!!