#StackBounty: #python #python-2.7 Python inherit magic methods based off __init__ value

Bounty: 100

Let’s imagine I have a single class X. The purpose of X is to wrap a list or dict and provide event-listening capabilities. All works well.

class X(object):
    def __init__(self, obj)
        self._obj = obj

    def __getattr__(self, name):
        # do stuff with self._obj

    def __getitem__(self, key):
        return self._obj[key]

    def __setitem__(self, key, val):
        self._obj[key] = val

    # rest of functionality ...

So this can be used to wrap a dict like so:

x = X({
    'foo' : False

x.listen('foo', callback)

X['foo'] = True         # triggers event
    'foo' : False       # triggers event

Or a list:

x = X([1,2])


X.append(1)        # triggers event
X[0] = 10          # triggers event

Great. Almost to what I wanted to accomplish …

Now the current issue is that, because X is for both list and dict objects, it can’t inherit from either. This means I don’t have the magic class functions, such as __contains__.

Which leads code like this

d = X({
        'foo' : True    

    if 'foo' in d:
        print 'yahoo!'

Throwing a KeyError.

How can I work around this without defining every magic method I need inside of X. If I did it this way, for each of those definitions I would have to write two return values based off whether self._obj is a list or dict.

I thought I could do this with metaclasses at first but that doesn’t seem to be a solution, since I need access to the values being passed to check whether it’s a dict or list.

Get this bounty!!!

#StackBounty: #python #numpy #matplotlib #scikit-learn Plot specific points in DBSCAN in sklearn in python

Bounty: 50

I have a set of documents and I create a feature matrix from it. Then I calculate cosine similarity between the documents. I input that cosine distance matrix to DBSCAN algorithm. My code is as follows.

import pandas as pd
import numpy as np
from sklearn.metrics import pairwise_distances
from scipy.spatial.distance import cosine
from sklearn.cluster import DBSCAN

# Initialize some documents
doc1 = {'Science':0.8, 'History':0.05, 'Politics':0.15, 'Sports':0.1}
doc2 = {'News':0.2, 'Art':0.8, 'Politics':0.1, 'Sports':0.1}
doc3 = {'Science':0.8, 'History':0.1, 'Politics':0.05, 'News':0.1}
doc4 = {'Science':0.1, 'Weather':0.2, 'Art':0.7, 'Sports':0.1}
doc5 = {'Science':0.2, 'Weather':0.7, 'Art':0.8, 'Sports':0.9}
doc6 = {'Science':0.2, 'Weather':0.8, 'Art':0.8, 'Sports':1.0}
collection = [doc1, doc2, doc3, doc4, doc5, doc6]
df = pd.DataFrame(collection)
# Fill missing values with zeros
df.fillna(0, inplace=True)
# Get Feature Vectors
feature_matrix = df.as_matrix()

# Get cosine distance between pairs
sims = pairwise_distances(feature_matrix, metric='cosine')

db = DBSCAN(min_samples=1, metric='precomputed').fit(sims)

Now, as shown in DBSCAN demo of sklearn I plot the clusters. That is, instead of X I insert sims, which is my cosine distance matrix.

labels = db.labels_
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
print('Estimated number of clusters: %d' % n_clusters_)
core_samples_mask = np.zeros_like(db.labels_, dtype=bool)
core_samples_mask[db.core_sample_indices_] = True

# Plot result
import matplotlib.pyplot as plt

# Black removed and is used for noise instead.
unique_labels = set(labels)
colors = [plt.cm.Spectral(each)
          for each in np.linspace(0, 1, len(unique_labels))]
for k, col in zip(unique_labels, colors):
    if k == -1:
        # Black used for noise.
        col = [0, 0, 0, 1]

    class_member_mask = (labels == k)

    xy = sims[class_member_mask & core_samples_mask]
    plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
             markeredgecolor='k', markersize=14)

    xy = sims[class_member_mask & ~core_samples_mask]
    plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=tuple(col),
             markeredgecolor='k', markersize=6)

plt.title('Estimated number of clusters: %d' % n_clusters_)
  1. My first question is, is it correct to change sims instead of X, because X represents coordinate values in the demo of sklearn whereas sims represent cosine distance values?
  2. My second question is, is it possible to make the given points into red color? For example I want to change the point that reprsents [0.8, 0.0, 0.0, 0.0, 0.2, 0.9, 0.7] of the feature_matrix to red?

Get this bounty!!!

#StackBounty: #server #package-management #python How to create a deb package from a directory of python project which also need of oth…

Bounty: 150

How do I create a deb package for distribution from a directory of python project on Ubuntu 16.04?

I did search for it and the closest I found is this

What I have right now is a directory consists of a REST server written in python. I want to convert this directory to a deb package so the person who get the package can easily installed on their computer to run the REST server.

The detail of directory structure is like below

  |                |--__init__.py
  |--main.py       |--v1---------------
  |                                |---__init__.py
  |--__init__.py                   |---resources-------

Normally, I run this server as python main.py after running a celery worker by typing celery -A server.api.v1.resources.tasks worker --loglevel=INFO

What I want to ask now is there a way to convert this project into deb packages for distribution? Since I also use celery worker for supporting the server, is it possible to convert this directory together with the command for the worker? What I mean here is that when someone run the deb package on his/her ubuntu computer, the server can run together with the worker.

Get this bounty!!!

#StackBounty: #python #opencv #python-tesseract #pytesser Highly inconsistent OCR result for tesseract

Bounty: 50

enter image description here

This is the original screenshot and I cropped the image into 4 parts and cleared the background of the image to the extent that I can possibly do but tesseract only detects the last column here and ignores the rest.

enter image description here

The output from the tesseract is shown as it is there are blank spaces which I remove while processing result




enter image description here

The output from the tesseract is shown as it is there are blank spaces which I remove while processing result




enter image description here

3579 10 1 7 148

2962 3 O 7 101

2214 2 2 7 99

2205 1 3 6 78

enter image description here












Am just dumping the output of

result = `pytesseract.image_to_string(Image.open("D:/newapproach/B&W"+str(i)+".jpg"),lang="New_Language")`

But I do not know how to proceed from here to get a consistent result.Is there anyway so that if I can force the tesseract to recognize the text.Because in trainer also tesseract on default recognition scan it’s not detected but once I select the area everything is scanned and received correctly


Get this bounty!!!

#StackBounty: #python #django #graphql Annotate with django-graphene and filters

Bounty: 100

I would like to sum a field in my resolver of django-graphene using the django_filter. Typically my resolvers would look like:

my_model = DjangoFilterConnectionField(

def my_resolver(self, args, context, info):

    return MyModelFilter(

Which works fine. However, I would like to provide a custom queryset to the model filter so that I can perform aggregations on fields. I’m trying to do something like this:

def my_resolver(self, args, context, info):
    queryset = MyModel.objects.values(
        cost_amt=Sum('cost_amt', output_field=FloatField()))

    return MyModelFilter(

Inspecting the raw SQL in GraphiQL, it looks correct. However, the error message I receive from GraphQL is

"message": "Received incompatible instance "{'cost_amt': 260.36, 'customer_id': 300968697}"."

This is the correct result, but I’m unsure why GraphQL is getting this object from django-graphene. How can I provide a custom queryset and make this work?

Get this bounty!!!

#StackBounty: #python #ssl #websocket #autobahn Connect websocket-client to Autobahn TLS server, ssl options

Bounty: 50

I want to connect this client

to this server

I don’t understand which ssl options would work. I tried CERT_NONE and PROTOCOL_TLS with no success

Get this bounty!!!

#StackBounty: #python #python-3.x #async-await #configuration Managing python application configuration in a key-value store

Bounty: 200

For a project we’re working on, we need a central place to store the configurations for various applications (they’re all headless services running in docker containers so local configuration files or command line parameters aren’t going to cut it in production)

We’ve chosen to use Consul as the central key-value store and since most of the modules are written in Python I’ve created a config wrapper to interact with it. It’s making use of the python-consul SDK for that purpose.

There are two main modes of operation:

  1. On initialization we load the current configuration values synchronously.
  2. After that a background monitoring job gets kicked off which executes a callback whenever a key changes.

For the second part asyncio is used since Consul provides a long-polling API which blocks key queries until there is an update to the value (or a timeout has elapsed). python-consul provides an asyncio adapter which makes use of aiohttp.

Since this is my first time in working with Python and asyncio I’m looking for feedback on best practices around the use of it. But any other feedback is welcome as well.

Implementation (kv_consul.py):

You'll need: pip install python-consul aiohttp

This implements the interface to the Consul key-value store (http://consul.io)
from typing import List, Callable, Coroutine, Iterable, Union, Tuple
from urllib.parse import urlparse
import logging
import asyncio
import threading

import consul
import consul.aio

class BackgroundTask:
    def run(self, coro: Callable[[any], Coroutine], args: Iterable, done_callback: Callable=None):
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, self._task_runner, coro, args, done_callback)

    def _task_runner(self, coro: Callable[[any], Coroutine], args: Iterable, done_callback: Callable):
        loop = asyncio.new_event_loop()

            fut = asyncio.ensure_future(coro(*args))
            if done_callback:


class KvStoreConfig:

    def __init__(self, keys: List[str], kvstore_endpoint: str=None):
        self.config_keys = keys

        args = {}
            if kvstore_endpoint:
                if '//' not in kvstore_endpoint:
                    kvstore_endpoint = '//' + kvstore_endpoint
                parts = urlparse(kvstore_endpoint, scheme=self.CONSUL_DEFAULT_SCHEME)
                if parts.hostname:
                    args['host'] = parts.hostname
                if parts.port:
                    args['port'] = parts.port
                if parts.scheme:
                    args['scheme'] = parts.scheme
            logging.exception("Failed to parse Consul endpoint '{}'".format(str(kvstore_endpoint)))

        self.consul_args = args
        self.consul = consul.Consul(**self.consul_args)

    def create_if_not_present(self, full_key: str, value: Union[str, bytes]) -> bool:
        return self.consul.kv.put(full_key, value, cas=0)

    def get_source(self) -> str:
        return "Consul@"+self.consul.http.base_uri

    def _decode_consul_data_value(data):
        if data is None:
            return None

        val = data['Value']
        if type(val) == str:
            return val
        if type(val) == bytes:
            return val.decode()
        return str(val)

    def __getitem__(self, item: str) -> Union[str, None]:
        index, data = self.consul.kv.get(item)
        return self._decode_consul_data_value(data)

    def start_monitoring(self, change_callback: Callable[[str], None]) -> bool:
        monitoring_started_event = threading.Event()
        BackgroundTask().run(self._monitor, [change_callback, monitoring_started_event])
        return monitoring_started_event.wait(5)

    async def _monitor(self, change_callback: Callable[[str], None], monitoring_started_event: threading.Event) -> None:
        loop = asyncio.get_event_loop()
        c = consul.aio.Consul(loop=loop, **self.consul_args)

        # get the current indices for each key
        futures = [asyncio.ensure_future(self._get_single_key_index(c, k), loop=loop) for k in self.config_keys]
        results = await asyncio.gather(*futures)
        index_map = {tup[0]: tup[1] for tup in results}

        # at this point we've captured the current index for each key, so even if the key gets modified before the
        # individual monitoring futures are executed we can deal with it since Consul will return immediately with
        # the updated value

        # start monitoring all keys based on the last known index
        awaitables = [self._monitor_single_key(c, k, i, change_callback) for k, i in index_map.items()]
        # block forever - ensures that the event loop keeps running
        await asyncio.wait([asyncio.ensure_future(a, loop=loop) for a in awaitables])

    async def _monitor_single_key(self, c: consul.aio.Consul, key: str, index: str, change_callback: Callable) -> None:
        while True:
            old_index = index
            index, data = await c.kv.get(key, index)
            if old_index != index:
                change_callback(key, self._decode_consul_data_value(data))

    async def _get_single_key_index(self, c: consul.aio.Consul, key: str) -> Tuple[str, str]:
        index, data = await c.kv.get(key)
        return key, index

Implementation notes:

  • the KvConfigStore‘s interface is the way it is because it’s being used by a more generic config wrapper which also supports loading config settings from command line and config files (for development, testing and debugging purposes). The idea is that it can be swapped for another implementation if needed (in case we decide to not use Consul any longer)
  • The BackgroundTask is a bit of a crutch since asyncio needs a thread driving an event loop. Since none of the existing application modules are written around asyncio I couldn’t run the event loop on the main thread so had to fork it off to a background thread

Integration tests:

Note: This test fixture requires internet access and a working docker install in order
      to spin up the consul test container.
      requires: pip install python-consul docker
from unittest import TestCase
import socket
import docker
import time
import consul
import threading
import json

from kv_consul import KvStoreConfig

class TestConsulServer:
    def __init__(self):
        docker_client = docker.from_env(version='auto')

        self.api_port = self.find_free_port()

        config = {
            "data_dir": "/consul/data",
            "advertise_addr": "",
            "ports": {
                "http": self.api_port,
                "dns": self.find_free_port(),
                "rpc": self.find_free_port(),
                "serf_lan": self.find_free_port(),
                "serf_wan": self.find_free_port(),
                "server": self.find_free_port(),

        env = {'CONSUL_LOCAL_CONFIG': json.dumps(config)}

        self.consul_container = 
            docker_client.containers.run('consul', 'agent -server -bootstrap-expect=1', environment=env,
                                         detach=True, name='unittest_kv_consul', network_mode='host')
        start = time.time()
        while not self.is_port_open(self.api_port) and time.time() - start < 5:

        if not self.is_port_open(self.api_port):
            raise Exception('Timed out while waiting for Consul to start up')

        while "cluster leadership acquired" not in str(self.consul_container.logs()) and time.time() - start < 15:

        if "cluster leadership acquired" not in str(self.consul_container.logs()):
            raise Exception('Timed out while waiting for Consul to acquire cluster leadership')

        diff = time.time() - start
        print("Consul available within {}sec".format(str(diff)))

    def is_port_open(port):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        result = sock.connect_ex(('', port))
        return result == 0

    def find_free_port():
        s = socket.socket()
        s.bind(('', 0))  # 0 means: let the OS pick one for you
        port = s.getsockname()[1]
        return port

    def shutdown(self):

class TestConsulKvStoreConfig(TestCase):
    consul_server = None
    consul = None

    def setUpClass(cls):
        cls.consul_server = TestConsulServer()
        cls.consul = consul.Consul(port=cls.consul_server.api_port)

    def tearDownClass(cls):

    def create_or_update_keys(cls, kv: {}):
        for k in kv:
            cls.consul.kv.put(k, kv[k])

    def delete_keys(cls, kv: {}):
        for k in kv:

    def test_get_known_keys(self):
        kv = {
            'foo': 'bar',
            'foo/sub': '123',
            'sub/foo/bar_dummy': 'here'

            c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
            self.assertEqual('bar', c['foo'])
            self.assertEqual('123', c['foo/sub'])
            self.assertEqual('here', c['sub/foo/bar_dummy'])

    def test_get_unknown_key(self):
        c = KvStoreConfig({}, 'localhost:' + str(self.consul_server.api_port))
        self.assertEqual(None, c['something'])

    def test_create_if_not_present_creates_new_key(self):
        c = KvStoreConfig({}, 'localhost:' + str(self.consul_server.api_port))
            self.assertTrue(c.create_if_not_present("something/new", "hello"))
            self.assertEqual("hello", c['something/new'])

    def test_create_if_not_present_does_not_change_existing_key(self):
        kv = {'foo': 'bar'}

            c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
            self.assertFalse(c.create_if_not_present("foo", "hello"))
            self.assertEqual("bar", c['foo'])

    class _KeyUpdateHandler:
        def __init__(self):
            self.updated_key = None
            self.updated_value = None
            self.update_event = threading.Event()

        def __call__(self, *args, **kwargs):
            self.updated_key = args[0]
            self.updated_value = args[1]

    def test_monitoring_existing_key_update(self):
        kv = {'foo': 'bar'}
        c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))

        handler = self._KeyUpdateHandler()

        self.assertTrue(c.start_monitoring(handler), msg="Failed to start monitoring")
        self.create_or_update_keys({'foo': 'baz'})
        self.assertTrue(handler.update_event.wait(timeout=5), msg="Timeout while waiting for update callback")
        self.assertEqual('foo', handler.updated_key)
        self.assertEqual('baz', handler.updated_value)

    def test_monitoring_nonexisting_key_update(self):
        kv = {'foo': 'bar'}
        c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
        self.assertEqual(None, c['foo'])

        handler = self._KeyUpdateHandler()

        self.assertTrue(c.start_monitoring(handler), msg="Failed to start monitoring")
        self.create_or_update_keys({'foo': 'bar'})
        self.assertTrue(handler.update_event.wait(timeout=5), msg="Timeout while waiting for update callback")
        self.assertEqual('foo', handler.updated_key)
        self.assertEqual('bar', handler.updated_value)

    def test_monitoring_deleted_key_update(self):
        kv = {'foo': 'bar'}
        c = KvStoreConfig(kv.keys(), 'localhost:' + str(self.consul_server.api_port))
        self.assertEqual('bar', c['foo'])

        handler = self._KeyUpdateHandler()

        self.assertTrue(c.start_monitoring(handler), msg="Failed to start monitoring")
        self.assertTrue(handler.update_event.wait(timeout=5), msg="Timeout while waiting for update callback")
        self.assertEqual('foo', handler.updated_key)
        self.assertEqual(None, handler.updated_value)

    def test_get_source_http(self):
        c = KvStoreConfig({}, 'http://localhost:1234')
        self.assertEqual("Consul@http://localhost:1234", c.get_source())

    def test_get_source_https(self):
        c = KvStoreConfig({}, 'https://localhost:1234')
        self.assertEqual("Consul@https://localhost:1234", c.get_source())

    def test_get_source_default_scheme(self):
        c = KvStoreConfig({}, 'localhost:5678')
        self.assertEqual("Consul@http://localhost:5678", c.get_source())

    def test_get_source_default_port(self):
        c = KvStoreConfig({}, 'http://localhost')
        self.assertEqual("Consul@http://localhost:8500", c.get_source())

    def test_get_source_default_scheme_port(self):
        c = KvStoreConfig({}, 'localhost')
        self.assertEqual("Consul@http://localhost:8500", c.get_source())

    def test_get_source_default_all(self):
        c = KvStoreConfig({})
        self.assertEqual("Consul@", c.get_source())

    def test_get_source_ip(self):
        c = KvStoreConfig({}, '')
        self.assertEqual("Consul@", c.get_source())

Test notes:

  • Fires up a temporary docker container to test with

Get this bounty!!!

#StackBounty: #python #linux #process #multiprocessing #spawn Multiprocessing on linux works with "spawn" only?

Bounty: 50

Problem description
I adjusted the code from this answer a little bit (see below). However when running this script on Linux (so command line: python script_name.py) it will print jobs running: x for all the jobs but then just seems to stuck after that. However when I use the spawn method (mp.set_start_method('spawn')) it works out fine and immediately starts printing the value of the counter variable (see listener method).


  • Why does it work only when spawning processes?
  • How can I adjust the code so it works with forc (because it’s probably faster)


import io
import csv
import multiprocessing as mp


def file_searcher(file_path):
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='t')

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count())

    # put listener to work first
    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for row in parsed_file:
        print('jobs running: ' + str(len(jobs) + 1))
        job = pool.apply_async(worker, (row, q))

  # collect results from the workers through the pool result queue
    for job in jobs:

    #now we are done, kill the listener

def worker(genome_row, q):
    complete_data = []
    #data processing
    #ftp connection to retrieve data
    return complete_data

def listener(q):
    '''listens for messages on the q, writes to file. '''
    f = io.open('output.txt', 'w', encoding='utf-8')
    counter = 0
    while 1:
        m = q.get()
        counter +=1
        if m == 'kill':
        for x in m:
            f.write(x + NEWLINE)

if __name__ == "__main__":

Get this bounty!!!

#StackBounty: #centos #http #python CentOS and mutiple python website developers

Bounty: 50

I need to be able to support multiple (~100) different users with their own websites on a CentOS based web server. They need to be able to use Python (v2&v3) along with Django. I understand that systemctl restart is required for apache, that can be arranged by a cron job. However, I have no idea as to the other tips & tricks and requirements from the admin side. Is there a website that will be use to me in setting up of the server? I understand that each of them can run their own web servers (simpleHTTPserver), but it looks very messy to me.

I would be grateful for any help regarding the issue.

Get this bounty!!!