#StackBounty: #python #processing #permutation How to use permutation to re-order units based on their degree of desirable neighborhood…

Bounty: 50

I would need help to implement a permutation algorithm allowing the generation of building plans, that I’ve recently stumbled on while reading Professor Kostas Terzidis’ latest publication: Permutation Design: Buildings, Texts and Contexts (2014).


  • Consider a site (b) that is divided into a grid system (a).
  • Let’s also consider a list of spaces to be placed within the limits of the site ( c ) and an adjacency matrix to determine the placement conditions and neighboring relations of these spaces (d)

enter image description here

Quoting Prof. Terzidis:

“A way of solving this problem is to stochastically place spaces within the grid until all spaces are fit and the constraints are satisfied”

The figure above shows such a problem and a sample solution (f).

ALGORITHM (as briefly described in the book)

1/ “Each space is associated with a list that contains all other spaces sorted according to their degree of desirable neighborhood.”

2/ “Then each unit of each space is selected from the list and then one-by-one placed randomly in the site until they fit in the site and the neighboring conditions are met. (If it fails then the process is repeated)”

Example of nine randomly generated plans:

enter image description here

I should add that the author explains later that this algorithm doesn’t rely on brute force techniques.


As you can see, the explanation is relatively vague and step 2 is rather unclear (in terms of coding). All I have so far are “pieces of a puzzle”. On one hand:

  • a “site” (list of selected integers)
  • an adjacency matrix (nestled lists)
  • “spaces” (dictionnary of lists)
    from random import shuffle
    n_col, n_row = 7, 5
    to_skip = [0, 1, 21, 22, 23, 24, 28, 29, 30, 31]
    site = [i for i in range(n_col * n_row) if i not in to_skip]
    #Makes a copy of "site" and shuffle its order for future random placing
    ssite = site; shuffle(ssite)
    n = 2
    k = (n_col * n_row) - len(to_skip)
    rsize = 30
    #Adjacency matrix
    adm = [[0, 6, 1, 5, 2],
           [6, 0, 1, 4, 0],
           [1, 1, 0, 8, 0],
           [5, 4, 8, 0, 3],
           [2, 0, 0, 3, 0]]
    spaces = {"office1": [1 for i in range(4)], 
              "office2": [2 for i in range(6)], 
              "office3": [3 for i in range(6)],
              "passage": [4 for i in range(7)],
              "entry": [5 for i in range(2)]}
    def setup():
        size(600, 400, P2D)
        #Grid's background
        rect(width/2 - (rsize/2) , height/2 + rsize/2 + n_row , rsize*n_col, rsize*n_row)
        #Displaying site (all selected units)
        for i in site:
            rect(width/2 - (rsize*n_col/2) + (i%n_col * rsize), height/2 + (rsize*n_row/2) + (n_row - ((k+len(to_skip))-(i+1))/n_col * rsize), rsize, rsize)
        #For each unit in each space...
        i = -1
        for space in spaces.items():
            for unit in space[1]:
                #get the indices of its desired neighbors in sorted order
                ada = adm[unit-1]
                sorted_indices = sorted(range(len(ada)), key = lambda k: ada[k])[::-1]
                #place each unit number randomly in the site
                text(unit, width/2 - (rsize*n_col/2) + (ssite[i]%n_col * rsize), height/2 + (rsize*n_row/2) + (n_row - ((k+len(to_skip))-(ssite[i]+1))/n_col * rsize))

enter image description here

And on the other hand:

  • a simple permutation algorithm
    rsize = 60
    n_col, n_row = (4, 3)
    n, k = 2, int(n_col * n_row)
    bits = [[] for i in range(int(pow(n, k)))]
    def setup():
        size(700, 400, P2D)
        for i in range(int(pow(n, k))):
            for j in range(k):
                bit = i / int(pow(n, k - j - 1)) % n + 1
    def draw():
        for i in range(k):
            bit = bits[frameCount % len(bits)][i]
            if bit == 1: fill(0)
            if bit == 2: fill(255, 0, 0)
            rect(width / 2 - rsize * n_col / 2 + i % n_col * rsize, height
                 / 2 + rsize * n_row / 2 + n_row - (k - (i + 1)) / n_col
                 * rsize, rsize, rsize)
            text(bit - 1, 60 + i * 8, 20)
        text('GENE:', 20, 20)
        text(u'NO.', 20, 40)
        text(frameCount, 45, 40)
        text('Remaining', 20, 60)
        text(len(bits) - frameCount, 90, 60)
        if frameCount == len(bits): noLoop()

enter image description here

I would really appreciate if someone could help connect the dots and explain me:

  • how to re-order the units based on their degree of desirable neighborhood ?
  • how is it related to permutation ?

Get this bounty!!!

#StackBounty: #python #django #oop Create a Blog which support multiple type of post

Bounty: 50

I am a new user of Django, and I am trying to figure out how to created a model which can support many kind (type) of elements.

This is the plot : I want to create a Blog module on my application.
To do this, I created a model Page, which describe a Blog Page. And a model PageElement, which describe a Post on the blog. Each Page can contain many PageElement.

A PageElement can have many types, because I want my users could post like just a short text, or just a video, or just a picture. I also would like (for example) the user could just post a reference to another model (like a reference to an user). Depending of the kind of content the user posted, the HTML page will display each PageElement in a different way.

But I don’t know what is the right way to declare the PageElement class in order to support all these cases 🙁

Here is my Page model :

class Page(models.Model):
    uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)

    # Basical informations
    title = models.CharField(max_length=150)
    description = models.TextField(blank=True)

    # Foreign links
    user = models.ForeignKey(

    created_at = models.DateTimeField(default=timezone.now)

    # Other fields ....

    class Meta:
        indexes = [
            models.Index(fields=['user', 'artist'])

For now, I have two solutions, the first one use inheritance : When you create a new post on the blog, you create an Element which inherit from PageElement model. Here are my different Models for each cases :

class PageElement(models.Model):
    page = models.OneToOneField(

    updated_at = models.DateTimeField(default=timezone.now)
    created_at = models.DateTimeField(default=timezone.now)

class PageImageElement(PageElement):
    image = models.ImageField(null=True)
    image_url = models.URLField(null=True)

class PageVideoElement(PageElement):
    video = models.FileField(null=True)
    video_url = models.URLField(null=True)

class PageTextElement(PageElement):
    text = models.TextField(null=True)

class PageUserElement(PageElement):
    user = models.ForeignKey(

This solution would be the one I have choosen if I had to work if a “pure” Django. Because I could stored each PageElement in a dictionnary and filter them by class. And this solution could be easily extended in the futur with new type of content.

But with Django models. It seems that is not the best solution. Because it will be really difficult to get all PageElement children from the database (I can’t just write “page.elements” to get all elements of all types, I need to get all %(class)s_elements elements manually and concatenate them :/). I have thinked about a solution like below (I don’t have tried it yet), but it seems overkilled for this problem (and for the database which will have to deal with a large number of request):

class Page(models.Model):
    # ...
    def get_elements(self):
        # Retrieve all PageElements children linked to the current Page
        R = []
        fields = self._meta.get_fields(include_hidden=True)
        for f in fields:
                if 'conversation_elements' in f.name:
                    R += getattr(self, f.name)
            except TypeError as e:

        return R

My second “solution” use an unique class which contains all fields I need. Depending of the kind of PageElement I want to create, I would put type field to the correct value, put the values in the corresponding fields, and put to NULL all other unused fields :

class PageElement(models.Model):
    page = models.OneToOneField(

    updated_at = models.DateTimeField(default=timezone.now)
    created_at = models.DateTimeField(default=timezone.now)

        ('img', 'Image'),
        ('vid', 'Video'),
        ('txt', 'Text'),
        ('usr', 'User'),
    type = models.CharField(max_length=60, choices=TYPES_CHOICE)

    # For type Image
    image = models.ImageField(null=True)
    image_url = models.URLField(null=True)

    # For type Video
    video = models.FileField(null=True)
    video_url = models.URLField(null=True)

    # For type Text
    text = models.TextField(null=True)

    # For type User
    user = models.ForeignKey(

With this solution, I can retrieve all elements in a single request with “page.elements”. But it is less extendable than the previous one (I need to modify my entire table structure to add a new field or a new kind of Element).

To be honnest, I have absolutly no idea of which solution is the best. And I am sure other (better) solutions exist, but my poor Oriented-Object skills don’t give me the ability to think about them ( 🙁 )…

I want a solution which can be easily modified in the future (if for example, I want to add a new Type “calendar” on the Blog, which reference a DateTime). And which would be easy to use in my application if I want to retrieve all Elements related to a Page…

Thanks for your attention 🙂

Get this bounty!!!

#StackBounty: #python #pyodbc #python-unittest Unit Test Pyodbc Database Connection

Bounty: 50

I wrote the following unit test to test whether the connection has been successfully established or not.

import unittest
from databse_access_pyodbc import *
from pyodbc import OperationalError

class TestDatabseConnection(unittest.TestCase):

    def test_connection_db(self):
            db_connection = get_db_connection()
        except OperationalError as err:
                "get_db_connection() raised pyodbc.OperationalError. " +
                "Connection to database failed. Detailed error message: " + err)

if __name__ == '__main__':

If the connection can be established, the test says:

Ran 1 test in 0.001s


Process finished with exit code 0

In case the connection could not be established, I’ll get the following:

Traceback (most recent call last):
  File "xxxPyCharm-Pch-0182.4505.26helperspycharm_jb_unittest_runner.py", line 35, in <module>
    main(argv=args, module=None, testRunner=unittestpy.TeamcityTestRunner, buffer=not JB_DISABLE_BUFFERING)
  File "xxxPython36libunittestmain.py", line 94, in __init__
  File "xxxPython36libunittestmain.py", line 141, in parseArgs
  File "xxxPython36libunittestmain.py", line 148, in createTests
  File "xxxPython36libunittestloader.py", line 219, in loadTestsFromNames
    suites = [self.loadTestsFromName(name, module) for name in names]
  File "xxxPython36libunittestloader.py", line 219, in <listcomp>
    suites = [self.loadTestsFromName(name, module) for name in names]
  File "xxxPython36libunittestloader.py", line 153, in loadTestsFromName
    module = __import__(module_name)
  File "xxxteststest_database_access_pyodbc.py", line 2, in <module>
    from databse_access_pyodbc import *
  File "xxxdatabse_access_pyodbc.py", line 40, in <module>
  File "xxxdatabse_access_pyodbc.py", line 36, in get_db_connection
pyodbc.OperationalError: ('08001', '[08001] [Microsoft][ODBC Driver 11 for SQL Server]Named Pipes-Anbieter: Es konnte keine Verbindung zu SQL Server hergestellt werden [53].  (53) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 11 for SQL Server]Anmeldungstimeout abgelaufen (0); [08001] [Microsoft][ODBC Driver 11 for SQL Server]Netzwerkbezogener oder instanzspezifischer Fehler beim Herstellen einer Verbindung mit SQL Server. Der Server wurde nicht gefunden, oder auf ihn kann nicht zugegriffen werden. Überprüfen Sie, ob der Instanzname richtig ist und ob SQL Server Remoteverbindungen zulässt. Weitere Informationen erhalten Sie in der SQL Server-Onlinedokumentation. (53)')

Process finished with exit code 1
Empty test suite.

Why is the exception OperationalError not catched by the test case and why does it say: Empty test suite?

Get this bounty!!!

#StackBounty: #python #pipeline #luigi Luigi – Overriding Task requires/input

Bounty: 50

I am using luigi to execute a chain of tasks, like so:

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('test.json')

    def run(self):
        with self.output().open('w') as f:

class Task2(luigi.Task):
    stuff = luigi.Parameter()

    def requires(self):
        return Task1(stuff=self.stuff)

    def output(self):
        return luigi.LocalTarget('something-else.json')

    def run(self):
        with self.output().open('w') as f:

This works exactly as desired when I start the entire workflow like so:


When using luigi.build you can also run multiple tasks by explicitly passing arguments, as per this example in the documentation.

However, in my situation, I would also like to be able to run the business logic of Task2 completely independently of it’s involvement in the workflow. This works fine for tasks that do not implement requires, as per this example.

My question is, how can I run this method both as part of the workflow, as well as on it’s own? Obviously, I could just add a new private method like _my_custom_run, which takes the data and returns the result, and then use this method in run, but it just feels like something that should be baked into the framework, so it makes me feel like I am misunderstanding Luigi’s best practices (still learning the framework). Any advice is appreciated, thanks!

Get this bounty!!!

#StackBounty: #python #matplotlib #rotation #logarithm Rotating text onto a line on a log scale in Matplotlib

Bounty: 50


I am working with data on a logarithmic scale and would like to rotate it to fit a line. I know the model but am unsure quite what angle I should be plugging into transform_angles to recover the correct rotation. After a bit of trial and error I know the answer is around 10 degrees for the axes limits I require.


import matplotlib.pylab as plt
import numpy as np

plt.ylim((1e-11, 1e-1))  # Other data is usually plotted and these are the ranges I need. 
plt.xlim((-0.2, 7.2))
x_fit = np.linspace(0.8, 3.2, 1000)
y_ols = (lambda x: np.exp(np.log(2)*(-20.8 + -1.23 * x)))(x_fit)  # I get these numbers from OLS fitting. 
plt.plot(x_fit, y_ols, 'b-', dashes='', label='__nolegend__')
plt.gca().text(np.min(x_fit), 1.2*y_ols[0], r'$O(2^{{ {:.3}x }})$'.format(-1.23), rotation=-10).set_bbox(dict(facecolor='w', alpha=0.7, edgecolor='k', linewidth=0))  # There are several others lines which have been omitted. 

enter image description here

Similar questions (keeps text rotated in data coordinate system after resizing?) only use linear axes, as do the matplotlib demos.

Remarks on the plot to answer comments

  • In my full plot I use a dual axis (both on log scales) with the twinx() feature. All the data are plotted on ax1 which uses a log-10 scale (as shown). (I could be more explicit and write yscale('log', basey=10)…). Ultimately I want a base-10 axis.
  • The model used in making y_ols comes from a regression fit to some original data and requires base-2. On a log scale it is easy enough to recover the gradient in any required base.

Using gradients

It is easy enough to recover the gradient on a logarithmic scale, using a mix of np.gradient and an angle (in radians) using np.arctan, but I can’t seem to recover a number close to the 10 degrees (0.17 radians).

transData.transform_angles(np.array((np.mean(np.gradient(np.log10(y_ols), np.mean(np.diff(x_fit)))),)), np.array([np.min(x_fit), 1.2*y_ols[0]]).reshape((1, 2)), radians=True)[0]

gives -1.6radians (approximately -90 degrees), whereas I require a number closer to 0.17radians. Perhaps I should be using a different base, or I am doing this all wrong (hence the post).

Extras – vertical offset

As can be seen in the code, I have added a vertical offset for the anchor point when using 1.2*y_ols[0]. If a solution needs to take this into consideration then all the better.

Get this bounty!!!

#StackBounty: #python #game #animation #raspberry-pi Runner 1-bit game in Python like the offline chrome dino

Bounty: 50

I made this little game to test if my Nokia 5110 screen can handle games. As in handle many frames per second/game loops.

I am using a library made for this screen called Adafruit_Nokia_LCD

These are the imports for some context, if needed:

import Adafruit_Nokia_LCD as LCD
import Adafruit_GPIO.SPI as SPI
import threading
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont

This is how I load images. I have some black and white images, use a binary converter to turn them into a multiline string. These are some tamagotchi sprites for example:

tam1 = """00000000000000111100000

There are two more sprites, for a walking animation called tam2 and tam3

I split them into lines:

tam1a = tam1.split('n')
tam2a = tam2.split('n')
tam3a = tam3.split('n')

I have a different thread running to capture key input, in which case is spacebar to jump:

key = "lol" #uhh this is the keyword for no input pretty much idk i just left it in
getch = _Getch()
def thread1():
    global key
    lock = threading.Lock()
    while True:
        with lock:
            key = getch() #this is some code that works just like the msvcrt version
threading.Thread(target = thread1).start()

This is the game loop, after setting some variables:

#jump variables 
dist = 0 #distance off the ground
gup = False #isjumping
#obstacle variables
xx = 0 #x position of the obstacle that loops through the screen

#other variables
score = 0;
ind = 0; #current frame of player, there is 3
extraspeed = 0; #makes the pillar go faster as your score rises
while True: #gameloop start
    draw.rectangle((0,0,LCD.LCDWIDTH,LCD.LCDHEIGHT), outline=255, fill=255)
    #clears the screen --^
    draw.text((0,0),str(score),font=font) #draw score
    extraspeed = floor(score / 100) #set extraspeed based on score
    if extraspeed > 10:
        extraspeed = 10
    score += 3
    xx = xx + 4 +extraspeed #move the pillar
    if xx >= 84:
        xx = -4;
    if key == ' ':
        if dist == 0:
            gup = True
            key = "lol" #if dist = 0 means its on the ground
            #the "lol" thing is something I left over for some reason, it means no key pressed
            key = "lol"
    if gup == True:
        if dist != 12: #jumps up to 12 pixels off the ground
            dist += 3
            gup = False #start falling if reached top
        if dist > 0:
            dist -= 3
            dist = 0
    #ind is the current animation sprite to draw
    if ind == 1:
        i = 12-dist #top left corner of drawn sprite y
        j = 60 #top left corner of drawn sprite x
        for line in tam1a:
            for c in line:
                if c == '1':
            j=60 #make same as j at start
    if ind == 2:
        i = 12-dist #top left corner of drawn sprite y
        j = 60 #top left corner of drawn sprite x
        for line in tam2a:
            for c in line:
                if c == '1':
            j=60 #make same as j at start
    if ind == 3:
        i = 12-dist #top left corner of drawn sprite y
        j = 60 #top left corner of drawn sprite x
        for line in tam3a:
            for c in line:
                if c == '1':
            j=60 #make same as j at start

    ind += 1
    if ind == 4:
        ind = 1 #restart the animation
    draw.line((0,44,83,44),fill=0) #draw some ground
    if xx >= float(67) and xx<= float(80) and dist <= 7:
        break #Some simple collision detection

# Display image.
    disp.image(image) #draw everything
    disp.display() #display everything

    time.sleep(0.2) #5 frames per second
draw.text((40,10),'Hit',font=font) #got here if 'break' occurs
disp.image(image) #displays hit, which means game over and loop is broken

My problem:

First of all: This screen is connected through GPIO pins to my Raspberry-Pi.

What do you think of this method to load sprites?

How about drawing the sprites? I iterate through the string, line by line then character by character and draw the pixels respectively (0 means dont draw, 1 means draw black)

Now while this code works, I experience ghosting and image burn in my lcd screen. While I know the nokia 5110 screen wasn’t designed to be for a gaming system, can my code be optimized to a point of reducing this ghosting ?

What do you think of my key input getting method, is a seperate thread fine for this job?

Lastly, would a different drawing method allow me to reach 60 frames per second or is it just impossible due to the way this screen is manufactured?

Here’s a pic. Notice how there is image burn from the last frame? There is only one pillar, and only one frame of the player, but two appear. Is this fixable, or is it because of the screen only?

Image burn and ghosting

Get this bounty!!!

#StackBounty: #python #django #session #cookies #redis Django: Sessions not working, users continually being logged out and prompted to…

Bounty: 100

I want to keep users on a Heroku app logged in, and they are currently being logged out on every view that requires login. The problem seems to be related to the sessionid in the Response Cookie not being set. E.g., clicking on “Account” will redirect users to the login view.

Response Headers

Location: /login?next=/home/
Set-Cookie: sessionid=; Domain=.appname.com; expires=Thu, 01-Jan-1970 00:00:00 GMT; Max-Age=0; Path=/


SESSION_ENGINE = 'django.contrib.sessions.backends.cached_db'
SESSION_COOKIE_DOMAIN = ".appname.com"

CSRF_COOKIE_DOMAIN = ".appname.com"


# ...


     # ...


# ...

    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2',
        'NAME': 'appname',

# https://devcenter.heroku.com/articles/python-concurrency-and-database-connections
db_from_env = dj_database_url.config(conn_max_age=500)

# https://devcenter.heroku.com/articles/heroku-redis
    "default": {
         "BACKEND": "redis_cache.RedisCache",
         "LOCATION": os.environ.get('REDIS_URL'),


def login(request):
    if hasattr(request, 'session') and not request.session.session_key:
        request.session.modified = True
    if request.user.is_authenticated():
        return redirect('home')
        return render(request, "login.html", {})

def home(request):
    return render(request, "home.html", {})

def ajax_login(request):
    data = {}

    if request.method == 'POST':
        form = LoginForm(request.POST)

        if form.is_valid():
            email = form.cleaned_data['email']
            password = form.cleaned_data['password']

        user = authenticate(username=email, password=password)
        if user is not None:
            login(request, user)
            data = {
                'is_authenticated': True
        form = LoginForm()
        data = {
            'is_authenticated': False
    return JsonResponse(data)

Here is how the app makes AJAX calls.


  beforeSend: function(xhr, settings) {
    if (!csrfSafeMethod(settings.type) && !this.crossDomain) {
      xhr.setRequestHeader("X-CSRFToken", csrftoken);
  url: '/ajax/login',
  type: 'POST',
  data: {
    'username': username,
    'password': password,
  dataType: 'json',
  credentials: 'include',
  success: function (data) {
    if (data.is_authenticated) {
      window.location.href = "/home/";


var csrftoken = $("[name=csrfmiddlewaretoken]").val();

Get this bounty!!!

#StackBounty: #python #phylogeny How does TnT (Goloboff et al., 2008) assign internal node labels?

Bounty: 50

I have used TnT v1.1 (Goloboff et al., 2008) to assign synapomorphies from a presence-absence matrix (PAM) to a phylogeny. TnT does not allow for internal node IDs or branch lengths and I need to use the branch lengths with the TnT data for statistical analyses. I have a huge dataset (>1000 taxa) so assigning branch lengths by hand would be tedious. TnT begins internal node naming based on the first species in the data file (eg if a datafile with 100 species is presented and Aspergillus flavus is the first species, then the node preceding A. flavus is annotated N101). TnT provides internal nodes in its ASCII tree but not in its output newick tree.

Does anyone know how I can either (a) get TnT to print out the internal nodes in a newick tree or how I can replicate TnTs naming procedure, preferably using python (ete2/ete3)?


Get this bounty!!!

#StackBounty: #python #python-3.x #python-multiprocessing Avoid global variables for unpicklable shared state among multiprocessing.Poo…

Bounty: 100

I frequently find myself writing programs in Python that construct a large (megabytes) read-only data structure and then use that data structure to analyze a very large (hundreds of megabytes in total) list of small records. Each of the records can be analyzed in parallel, so a natural pattern is to set up the read-only data structure and assign it to a global variable, then create a multiprocessing.Pool (which implicitly copies the data structure into each worker process, via fork) and then use imap_unordered to crunch the records in parallel. The skeleton of this pattern tends to look like this:

classifier = None
def classify_row(row):
    return classifier.classify(row)

def classify(classifier_spec, data_file):
    global classifier
        classifier = Classifier(classifier_spec)
        with open(data_file, "rt") as fp, 
             multiprocessing.Pool() as pool:
            rd = csv.DictReader(fp)
            yield from pool.imap_unordered(classify_row, rd)
        classifier = None

I’m not happy with this because of the global variable and the implicit coupling between classify and classify_row. Ideally, I would like to write

def classify(classifier_spec, data_file):
    classifier = Classifier(classifier_spec)
    with open(data_file, "rt") as fp, 
         multiprocessing.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)

but this does not work, because the Classifier object usually contains objects which cannot be pickled (because they are defined by extension modules whose authors didn’t care about that); I have also read that it would be really slow if it did work, because the Classifier object would get copied into the worker processes on every invocation of the bound method.

Is there a better alternative? I only care about 3.x.

Get this bounty!!!

#StackBounty: #python #python-asyncio How to create asyncio stream reader/writer for stdin/stdout?

Bounty: 100

I need to write two prorgams which will be run as a parent process and its child. The parent process spawns the child and then they communicate via pair of pipes connected to child’s stdin and stdout. The communication is peer-to-peer, that’s why I need asyncio. A simple read/reply loop won’t do.

I have written the parent. No problem because asyncio provides everything I needed in create_subprocess_exec().

However I don’t know how to create a similar stream reader/writer in the child. I did not expect any problems. because the pipes are already created and file descriptors 0 and 1 are ready to use when the child process starts. No connection is to be open, no process needs to be spawned.

This is my not working attempt:

import asyncio
import sys

_DEFAULT_LIMIT = 64 * 1024

async def connect_stdin_stdout(limit=_DEFAULT_LIMIT, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader(limit=limit, loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    r_transport, _ = await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, _ = await loop.connect_write_pipe(lambda: protocol, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
    return reader, writer

The problem is I have two transports where I should have one. The function fails, because it tries to set the protocol’s transport twice:

await loop.connect_read_pipe(lambda: protocol, sys.stdin)
await loop.connect_write_pipe(lambda: protocol, sys.stdout)
# !!!! assert self._transport is None, 'Transport already set'

I tried to pass a dummy protocol to the first line, but this line is not correct either, because both transports are needed, not just one:

writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)

I guess I need to combine two unidirectional transports to one bidirectional somehow. Or is my approach entirely wrong? Could you please give me some advice?

UPDATE: after some test this seems to work (but does not look good to me):

async def connect_stdin_stdout(limit=_DEFAULT_LIMIT, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader(limit=limit, loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    dummy = asyncio.Protocol()
    await loop.connect_read_pipe(lambda: protocol, sys.stdin) # sets read_transport
    w_transport, _ = await loop.connect_write_pipe(lambda: dummy, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
return reader, writer

Get this bounty!!!