#StackBounty: #python #ssl #websocket #autobahn Connect websocket-client to Autobahn TLS server, ssl options

Bounty: 50

I want to connect this client

to this server

I don’t understand which ssl options would work. I tried CERT_NONE and PROTOCOL_TLS with no success

Get this bounty!!!

#StackBounty: #python #python-3.x #async-await #configuration Managing python application configuration in a key-value store

Bounty: 200

For a project we’re working on, we need a central place to store the configurations for various applications (they’re all headless services running in docker containers so local configuration files or command line parameters aren’t going to cut it in production)

We’ve chosen to use Consul as the central key-value store and since most of the modules are written in Python I’ve created a config wrapper to interact with it. It’s making use of the python-consul SDK for that purpose.

There are two main modes of operation:

  1. On initialization we load the current configuration values synchronously.
  2. After that a background monitoring job gets kicked off which executes a callback whenever a key changes.

For the second part asyncio is used since Consul provides a long-polling API which blocks key queries until there is an update to the value (or a timeout has elapsed). python-consul provides an asyncio adapter which makes use of aiohttp.

Since this is my first time in working with Python and asyncio I’m looking for feedback on best practices around the use of it. But any other feedback is welcome as well.

Implementation (kv_consul.py):

You'll need: pip install python-consul aiohttp

This implements the interface to the Consul key-value store (http://consul.io)
from typing import List, Callable, Coroutine, Iterable, Union, Tuple
from urllib.parse import urlparse
import logging
import asyncio
import threading

import consul
import consul.aio

class BackgroundTask:
    def run(self, coro: Callable[[any], Coroutine], args: Iterable, done_callback: Callable=None):
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, self._task_runner, coro, args, done_callback)

    def _task_runner(self, coro: Callable[[any], Coroutine], args: Iterable, done_callback: Callable):
        loop = asyncio.new_event_loop()

            fut = asyncio.ensure_future(coro(*args))
            if done_callback:


class KvStoreConfig:

    def __init__(self, keys: List[str], kvstore_endpoint: str=None):
        self.config_keys = keys

        args = {}
            if kvstore_endpoint:
                if '//' not in kvstore_endpoint:
                    kvstore_endpoint = '//' + kvstore_endpoint
                parts = urlparse(kvstore_endpoint, scheme=self.CONSUL_DEFAULT_SCHEME)
                if parts.hostname:
                    args['host'] = parts.hostname
                if parts.port:
                    args['port'] = parts.port
                if parts.scheme:
                    args['scheme'] = parts.scheme
            logging.exception("Failed to parse Consul endpoint '{}'".format(str(kvstore_endpoint)))

        self.consul_args = args
        self.consul = consul.Consul(**self.consul_args)

    def create_if_not_present(self, full_key: str, value: Union[str, bytes]) -> bool:
        return self.consul.kv.put(full_key, value, cas=0)

    def get_source(self) -> str:
        return "Consul@"+self.consul.http.base_uri

    def _decode_consul_data_value(data):
        if data is None:
            return None

        val = data['Value']
        if type(val) == str:
            return val
        if type(val) == bytes:
            return val.decode()
        return str(val)

    def __getitem__(self, item: str) -> Union[str, None]:
        index, data = self.consul.kv.get(item)
        return self._decode_consul_data_value(data)

    def start_monitoring(self, change_callback: Callable[[str], None]) -> bool:
        monitoring_started_event = threading.Event()
        BackgroundTask().run(self._monitor, [change_callback, monitoring_started_event])
        return monitoring_started_event.wait(5)

    async def _monitor(self, change_callback: Callable[[str], None], monitoring_started_event: threading.Event) -> None:
        loop = asyncio.get_event_loop()
        c = consul.aio.Consul(loop=loop, **self.consul_args)

        # get the current indices for each key
        futures = [asyncio.ensure_future(self._get_single_key_index(c, k), loop=loop) for k in self.config_keys]
        results = await asyncio.gather(*futures)
        index_map = {tup[0]: tup[1] for tup in results}

        # at this point we've captured the current index for each key, so even if the key gets modified before the
        # individual monitoring futures are executed we can deal with it since Consul will return immediately with
        # the updated value

        # start monitoring all keys based on the last known index
        awaitables = [self._monitor_single_key(c, k, i, change_callback) for k, i in index_map.items()]
        # block forever - ensures that the event loop keeps running
        await asyncio.wait([asyncio.ensure_future(a, loop=loop) for a in awaitables])

    async def _monitor_single_key(self, c: consul.aio.Consul, key: str, index: str, change_callback: Callable) -> None:
        while True:
            old_index = index
            index, data = await c.kv.get(key, index)
            if old_index != index:
                change_callback(key, self._decode_consul_data_value(data))

    async def _get_single_key_index(self, c: consul.aio.Consul, key: str) -> Tuple[str, str]:
        index, data = await c.kv.get(key)
        return key, index

Implementation notes:

  • the KvConfigStore‘s interface is the way it is because it’s being used by a more generic config wrapper which also supports loading config settings from command line and config files (for development, testing and debugging purposes). The idea is that it can be swapped for another implementation if needed (in case we decide to not use Consul any longer)
  • The BackgroundTask is a bit of a crutch since asyncio needs a thread driving an event loop. Since none of the existing application modules are written around asyncio I couldn’t run the event loop on the main thread so had to fork it off to a background thread

Integration tests:

Note: This test fixture requires internet access and a working docker install in order
      to spin up the consul test container.
      requires: pip install python-consul docker
from unittest import TestCase
import socket
import docker
import time
import consul
import threading
import json

from kv_consul import KvStoreConfig

class TestConsulServer:
    def __init__(self):
        docker_client = docker.from_env(version='auto')

        self.api_port = self.find_free_port()

        config = {
            "data_dir": "/consul/data",
            "advertise_addr": "",
            "ports": {
                "http": self.api_port,
                "dns": self.find_free_port(),
                "rpc": self.find_free_port(),
                "serf_lan": self.find_free_port(),
                "serf_wan": self.find_free_port(),
                "server": self.find_free_port(),

        env = {'CONSUL_LOCAL_CONFIG': json.dumps(config)}

        self.consul_container = 
            docker_client.containers.run('consul', 'agent -server -bootstrap-expect=1', environment=env,
                                         detach=True, name='unittest_kv_consul', network_mode='host')
        start = time.time()
        while not self.is_port_open(self.api_port) and time.time() - start < 5:

        if not self.is_port_open(self.api_port):
            raise Exception('Timed out while waiting for Consul to start up')

        while "cluster leadership acquired" not in str(self.consul_container.logs()) and time.time() - start < 15:

        if "cluster leadership acquired" not in str(self.consul_container.logs()):
            raise Exception('Timed out while waiting for Consul to acquire cluster leadership')

        diff = time.time() - start
        print("Consul available within {}sec".format(str(diff)))

    def is_port_open(port):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        result = sock.connect_ex(('', port))
        return result == 0

    def find_free_port():
        s = socket.socket()
        s.bind(('', 0))  # 0 means: let the OS pick one for you
        port = s.getsockname()[1]
        return port

    def shutdown(self):

class TestConsulKvStoreConfig(TestCase):
    consul_server = None
    consul = None

    def setUpClass(cls):
        cls.consul_server = TestConsulServer()
        cls.consul = consul.Consul(port=cls.consul_server.api_port)

    def tearDownClass(cls):

    def create_or_update_keys(cls, kv: {}):
        for k in kv:
            cls.consul.kv.put(k, kv[k])

    def delete_keys(cls, kv: {}):
        for k in kv:

    def test_get_known_keys(self):
        kv = {
            'foo': 'bar',
            'foo/sub': '123',
            'sub/foo/bar_dummy': 'here'

            c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
            self.assertEqual('bar', c['foo'])
            self.assertEqual('123', c['foo/sub'])
            self.assertEqual('here', c['sub/foo/bar_dummy'])

    def test_get_unknown_key(self):
        c = KvStoreConfig({}, 'localhost:' + str(self.consul_server.api_port))
        self.assertEqual(None, c['something'])

    def test_create_if_not_present_creates_new_key(self):
        c = KvStoreConfig({}, 'localhost:' + str(self.consul_server.api_port))
            self.assertTrue(c.create_if_not_present("something/new", "hello"))
            self.assertEqual("hello", c['something/new'])

    def test_create_if_not_present_does_not_change_existing_key(self):
        kv = {'foo': 'bar'}

            c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
            self.assertFalse(c.create_if_not_present("foo", "hello"))
            self.assertEqual("bar", c['foo'])

    class _KeyUpdateHandler:
        def __init__(self):
            self.updated_key = None
            self.updated_value = None
            self.update_event = threading.Event()

        def __call__(self, *args, **kwargs):
            self.updated_key = args[0]
            self.updated_value = args[1]

    def test_monitoring_existing_key_update(self):
        kv = {'foo': 'bar'}
        c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))

        handler = self._KeyUpdateHandler()

        self.assertTrue(c.start_monitoring(handler), msg="Failed to start monitoring")
        self.create_or_update_keys({'foo': 'baz'})
        self.assertTrue(handler.update_event.wait(timeout=5), msg="Timeout while waiting for update callback")
        self.assertEqual('foo', handler.updated_key)
        self.assertEqual('baz', handler.updated_value)

    def test_monitoring_nonexisting_key_update(self):
        kv = {'foo': 'bar'}
        c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
        self.assertEqual(None, c['foo'])

        handler = self._KeyUpdateHandler()

        self.assertTrue(c.start_monitoring(handler), msg="Failed to start monitoring")
        self.create_or_update_keys({'foo': 'bar'})
        self.assertTrue(handler.update_event.wait(timeout=5), msg="Timeout while waiting for update callback")
        self.assertEqual('foo', handler.updated_key)
        self.assertEqual('bar', handler.updated_value)

    def test_monitoring_deleted_key_update(self):
        kv = {'foo': 'bar'}
        c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
        self.assertEqual('bar', c['foo'])

        handler = self._KeyUpdateHandler()

        self.assertTrue(c.start_monitoring(handler), msg="Failed to start monitoring")
        self.assertTrue(handler.update_event.wait(timeout=5), msg="Timeout while waiting for update callback")
        self.assertEqual('foo', handler.updated_key)
        self.assertEqual(None, handler.updated_value)

    def test_get_source_http(self):
        c = KvStoreConfig({}, 'http://localhost:1234')
        self.assertEqual("Consul@http://localhost:1234", c.get_source())

    def test_get_source_https(self):
        c = KvStoreConfig({}, 'https://localhost:1234')
        self.assertEqual("Consul@https://localhost:1234", c.get_source())

    def test_get_source_default_scheme(self):
        c = KvStoreConfig({}, 'localhost:5678')
        self.assertEqual("Consul@http://localhost:5678", c.get_source())

    def test_get_source_default_port(self):
        c = KvStoreConfig({}, 'http://localhost')
        self.assertEqual("Consul@http://localhost:8500", c.get_source())

    def test_get_source_default_scheme_port(self):
        c = KvStoreConfig({}, 'localhost')
        self.assertEqual("Consul@http://localhost:8500", c.get_source())

    def test_get_source_default_all(self):
        c = KvStoreConfig({})
        self.assertEqual("Consul@", c.get_source())

    def test_get_source_ip(self):
        c = KvStoreConfig({}, '')
        self.assertEqual("Consul@", c.get_source())

Test notes:

  • Fires up a temporary docker container to test with

Get this bounty!!!

#StackBounty: #python #signal-processing #sympy Implementing convolution using SymPy

Bounty: 100

I started using SymPy recently, and I implemented convolution using it.

def convolve(f,g,x,lower_limit,upper_limit):
    h = g.subs(x,x-y)
    return integrate(f*h,(y,lower_limit,upper_limit))

It seems to work for a few tests I’ve done.

Would like to know what you think of it, any improvements are appreciated.

Get this bounty!!!

#StackBounty: #python #linux #process #multiprocessing #spawn Multiprocessing on linux works with "spawn" only?

Bounty: 50

Problem description
I adjusted the code from this answer a little bit (see below). However when running this script on Linux (so command line: python script_name.py) it will print jobs running: x for all the jobs but then just seems to stuck after that. However when I use the spawn method (mp.set_start_method('spawn')) it works out fine and immediately starts printing the value of the counter variable (see listener method).


  • Why does it work only when spawning processes?
  • How can I adjust the code so it works with forc (because it’s probably faster)


import io
import csv
import multiprocessing as mp


def file_searcher(file_path):
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='t')

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count())

    # put listener to work first
    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for row in parsed_file:
        print('jobs running: ' + str(len(jobs) + 1))
        job = pool.apply_async(worker, (row, q))

  # collect results from the workers through the pool result queue
    for job in jobs:

    #now we are done, kill the listener

def worker(genome_row, q):
    complete_data = []
    #data processing
    #ftp connection to retrieve data
    return complete_data

def listener(q):
    '''listens for messages on the q, writes to file. '''
    f = io.open('output.txt', 'w', encoding='utf-8')
    counter = 0
    while 1:
        m = q.get()
        counter +=1
        if m == 'kill':
        for x in m:
            f.write(x + NEWLINE)

if __name__ == "__main__":

Get this bounty!!!

#StackBounty: #centos #http #python CentOS and mutiple python website developers

Bounty: 50

I need to be able to support multiple (~100) different users with their own websites on a CentOS based web server. They need to be able to use Python (v2&v3) along with Django. I understand that systemctl restart is required for apache, that can be arranged by a cron job. However, I have no idea as to the other tips & tricks and requirements from the admin side. Is there a website that will be use to me in setting up of the server? I understand that each of them can run their own web servers (simpleHTTPserver), but it looks very messy to me.

I would be grateful for any help regarding the issue.

Get this bounty!!!

#StackBounty: #python #shell #subprocess #pty Use python's pty to create a live console

Bounty: 50

I’m trying to create an execution environment/shell that will remotely execute on a server, which streams the stdout,err,in over the socket to be rendered in a browser. I currently have tried the approach of using subprocess.run with a PIPE. The Problem is that I get the stdout after the process has completed. What i want to achieve is to get a line-by-line, pseudo-terminal sort of implementation.

My current implementation


def greeter():
    for _ in range(10):
        print('hello world')


and in the shell

>>> import subprocess
>>> result = subprocess.run(['python3', 'test.py'], stdout=subprocess.PIPE)
>>> print(result.stdout.decode('utf-8'))
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world

If i try to attempt even this simple implementation with pty, how does one do it?

Get this bounty!!!

#StackBounty: #python #pandas Need rules of thumb for out of core larger than ram dataset on a laptop

Bounty: 100

totally new to larger than ram datasets but I have csv files that are about 100 gb each with around 300 million rows each (just two of them).

What I am looking for is something like: don’t go analyzing more than 1 terabyte of data on a laptop. Or don’t analyze data that is more than 10x your ram or the wait times will prove frustrating. This is what I mean by rules of thumb.

I have a mac laptop with an i5, 8gb ram, ssd is it reasonable to process this data (100gb per file and I have 2 files) with either dask or blaze in python?

I have tried and could read in the csv but when doing simple trials like dropping one column or finding the length of the data frame it takes at least an hour For the drop (I gave up waiting) and half an hour for the length() (that did finish). Way too slow to be productive. I changed the data format to parquet and amazingly the data is down to just 10 gb. This is the only way they the length() command ever even finished.

So my question is: are these response times normal experience in general given my hardware and csv size? Are my expectations for what dask can do way too high? Any rough guidelines on how to troubleshoot or is this clearly underpowered hardware?

Get this bounty!!!

#StackBounty: #python #python-3.x #array #complexity Maximum consecutive subarray sum for a variable sliding window size

Bounty: 50

The Problem:

Given an array of integers L find the largest sum of a consecutive subarray of size k or less.


  • 2 <= len(L) <= 10000

  • 3 <= k <= len(L)

  • each element in the array will have an absolute value no more than 200

  • there will be at least one positive integer in the array L


L=[-200, 91, 82, 43], k=3, the result should be 216

L=[41, 90, -62, -16, 25, -61, -33, -32, -33], k=5, the result should be 131

The Implementation:

Initially, I started with brute-forcing the problem, probing all sizes of the sliding window starting from k to 1. This immediately got me into “time limit exceeded” situation.

The idea implemented below is based on picking out only positive integers initially. For every positive integer, we are looking if the following integers in a window are contributing to the current sum or not, cutting off situations when current sum drops below zero:

def maximum_consecutive_subarray(L, k):
    global_max = 0

    for index in range(len(L) - 1, -1, -1):
        if L[index] < 0:  # skipping all negative values

        sliding_index = index
        positive_count = positive_sum = 0

        while sliding_index >= 0 and positive_count < k:
            if L[sliding_index] >= 0:
                positive_count += 1
                positive_sum += L[sliding_index]

                global_max = max(global_max, positive_sum)
                negative_count = 1
                negative_sum = L[sliding_index]

                while sliding_index - 1 >= 0 > L[sliding_index - 1]:  # iterating over all consecutive negative values
                    negative_count += 1
                    negative_sum += L[sliding_index - 1]
                    sliding_index -= 1

                    if positive_count + negative_count == k:  # break if sliding window size limit reached

                if positive_sum + negative_sum > 0:  # if we still contribute to the maximum value
                    positive_count += negative_count
                    positive_sum += negative_sum
                    break  # exit this window if nothing was to contribute

            sliding_index -= 1

    return global_max

I would like to know if we can further improve the solution time complexity wise and would appreciate any other feedback.

Get this bounty!!!

#StackBounty: #machine-learning #logistic #python #naive-bayes How to explain low performance of naive Bayes on a dataset

Bounty: 50

I’m working on a project from Udacity’s ml nd, finding donors,

I’m making the initial test using three algorithms:

LogisticRegression -> RED
GaussianNB -> Green
AdaBoostClassifier -> Blue

This is the result I’m getting:

enter image description here

I wonder why nb has such a poor performance.
This is some informations regarding the dataset:

1) Initial numerical features are not highly correlated:

enter image description here

2) There are categorical features on that were encoded increasing the number of features up to 100 and making the dataset more sparse.


I also tried using decision Trees,these has a poor performace too, best case is around 0.45 for depth 8 over that it starts presenting a high variance, which can explain why Adaboost works well on this model since it’s main advantage is it’s capacity for improving the variance issue.

enter image description here

I still have the doubt of why NB and DT have such a poor performance on this dataset compared with the logistic regression which is a simple model too.

Get this bounty!!!

#StackBounty: #python #hidden-markov-models #markov-chains #markov-models #hmmlearn Decoding sequences in a GaussianHMM

Bounty: 100

I’m playing around with Hidden Markov Models for a stock market prediction problem. My data matrix contains various features for a particular security:

01-01-2001, .025, .012, .01
01-02-2001, -.005, -.023, .02

I fit a simple GaussianHMM:

from hmmlearn import GaussianHMM
mdl = GaussianHMM(n_components=3,covariance_type='diag',n_iter=1000)

With the model (λ), I can decode an observation vector to find the most likely hidden state sequence corresponding to the observation vector:

print mdl.decode(test[0:5,1:])
(72.75, array([2, 1, 2, 0, 0]))

Above, I’ve decoded the hidden state sequence of an observation vector Ot = (O1, O2, …, Od) which contains the first five instances in a test set. I’d like to estimate the hidden state of the sixth instance in the test set. The idea is to iterate over a discrete set of possible feature values for the sixth instance, and select the observation sequence Ot+1 with highest likelihood argmax = P(O1, O2, …, Od+1 | λ ). Once we observe the true feature values of Od+1, we can shift the sequence (of length 5) by one and do it all over again:

    l = 5
    for i in xrange(len(test)-l):
        values = []
        for a in arange(-0.05,0.05,.01):
            for b in arange(-0.05,0.05,.01):
                for c in arange(-0.05,0.05,.01):
     print max(enumerate(values),key=lambda x: x[1])

The problem is that when I decode the observation vector Ot+1, the prediction with the highest likelihood is almost always the same (e.g. the estimate with highest likelihood always has feature values for Od+1 that equal [ 0.04 0.04 0.04] and is hidden state [0]):

(555, (74.71248518927949, array([2, 1, 2, 0, 0, 0]))) [ 0.04  0.04  0.04]
(555, (69.41963358191555, array([2, 2, 0, 0, 0, 0]))) [ 0.04  0.04  0.04]
(555, (77.11516871816922, array([2, 0, 0, 0, 0, 0]))) [ 0.04  0.04  0.04]

It’s entirely possible that I’m misunderstanding the purpose of mdl.decode, and thus using it incorrectly. If that’s the case, how best can I go about iterating over possible values of Od+1, and then maximizing P(O1, O2, …, Od+1 | λ)?

Get this bounty!!!