#StackBounty: #python-3.x #pandas #csv #dataset Read and reverse data chunk by chunk from a csv file and copy to a new csv file

Bounty: 100

I’m dealing with a very large csv file. So, I can only read the data chunk by chunk into the memory. The expected flow of events is as follows:

1) Read chunk (eg: 10 rows) of data from csv using pandas.

2) Reverse the order of data

3) Copy each row to new csv file

The reason for reversing the data is that the csv file contains data from latest to old (1st row is the latest entry) and since I am trying to do a time series forecasting I need data to be from old to latest (1st row oldest entry).
My existing code does not copy the rows into the new csv file properly.

The dataset is the train.csv of the Rossmann dataset from kaggle

Show below is my code:

import pandas as pd
import csv



def reverse():

    fields = ["Store","DayOfWeek","Date","Sales","Customers","Open","Promo","StateHoliday","SchoolHoliday"]
    with open('processed_train.csv', mode='a') as stock_file:
        writer = csv.writer(stock_file,delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        writer.writerow(fields)

    for chunk in pd.read_csv("train.csv", chunksize=10):
        store_data = chunk.reindex(index=chunk.index[::-1])
        append_data_csv(store_data)

def append_data_csv(store_data):
    with open('processed_train.csv', mode='a') as store_file:
        writer = csv.writer(store_file,delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        for index, row in store_data.iterrows():
            print(row)
            writer.writerow([row['Store'],row['DayOfWeek'],row['Date'],row['Sales'],row['Customers'],row['Open'],row['Promo'],row['StateHoliday'],row['SchoolHoliday']])




reverse()

Thank you, in advance


Get this bounty!!!

#StackBounty: #python #python-3.x #opengl #pyqt #pyqt5 Problems with openGL and PyQt5

Bounty: 50

I am moving from Winpython3.4.3.7Qt4 to Winpython3.x.x.xQt5 (I tried a bunch of versions), and I am having the following problem:

The following minimal code (it does nothing usable, but demonstrates the error):

from PyQt5 import QtWidgets
import OpenGL.GL as gl
from PyQt5.QtOpenGL import QGLWidget
qapp = QtWidgets.QApplication([])
window = QGLWidget()
window.makeCurrent()
index = gl.glGenLists(1)
print(index)

runs on all my machines with Winpython3.4.3.7Qt4 and prints ‘1’. It does not, however, run on all machines when I use Winpython3.x.x.xQt5. The error I get is:

Traceback (most recent call last):
  File ".opengl.py", line 12, in <module>
    index = gl.glGenLists(1)
  File "C:Winpython-64bit-3.6.7.0python-3.6.7libsite-packagesOpenGLplatformbaseplatform.py", line 405, in __call__
    return self( *args, **named )
  File "C:Winpython-64bit-3.6.7.0python-3.6.7libsite-packagesOpenGLerror.py", line 232, in glCheckError
    baseOperation = baseOperation,
OpenGL.error.GLError: GLError(
        err = 1282,
        description = b'invalid operation',
        baseOperation = glGenLists,
        cArguments = (1,),
        result = 0
)

I have the feeling that window.makeCurrent() isn’t coming through, but I have no idea why. What changed in that regard from Qt4 to Qt5?


Get this bounty!!!

#StackBounty: #python #python-3.x #game #role-playing-game creating procedural generated RPG Dungeon maps

Bounty: 50

Intro

I am building a rogue like RPG, currently it is still a work in progress, but I would like to get an intermediate review of my Procedural map generation code.

I’ve chosen to use the Binary Space Partitioning algo, because it would make my life easier in the future when I will add corridors to the rooms.

There are some TODO’s left,

  • Creating random rooms within the leaves
  • Adding corridors to connect the (random-sized) rooms

but they are not up for review.

Code

from queue import Queue
from random import choice, randint
from collections import namedtuple
from enum import Enum

Position = namedtuple("Position", ["y", "x"])

class Tile(Enum):
    WALL = '#'
    EMPTY = ' '
    ROCK = '.'

class Split(Enum):
    HORIZONTAL = 0
    VERTICAL   = 1

class Room():
    def __init__(self, lu, rd):
        self.lu = lu
        self.rd = rd

    @property
    def position_map(self):
        return self._get_positions()

    @property
    def border_map(self):
        return self._get_border()

    @property
    def width(self):
        return self.rd.x - self.lu.x

    @property
    def height(self):
        return self.rd.y - self.lu.y

    def _get_positions(self):
        return [
            Position(row, col)
            for col in range(self.lu.x + 1, self.rd.x) 
            for row in range(self.lu.y + 1, self.rd.y)
        ]

    def _get_border(self):
        return [Position(y, self.lu.x) for y in range(self.lu.y, self.rd.y)] + 
               [Position(y, self.rd.x) for y in range(self.lu.y, self.rd.y)] + 
               [Position(self.lu.y, x) for x in range(self.lu.x, self.rd.x)] + 
               [Position(self.rd.y, x) for x in range(self.lu.x, self.rd.x + 1)]

class Leaf():
    def __init__(self, lu, rd, parent, min_room_space):
        self.lu = lu
        self.rd = rd
        self.parent = parent
        self.min_room_space = min_room_space
        self._children = None
        self._sibling = None
        self._room = None

    @property
    def children(self):
        return self._children

    @children.setter
    def children(self, value):
        self._children = value

    @property
    def sibling(self):
        return self._sibling

    @sibling.setter
    def sibling(self, value):
        self._sibling = value

    @property
    def room(self):
        return self._room or self._generate_room()

    @property
    def width(self):
        return self.rd.x - self.lu.x

    @property
    def height(self):
        return self.rd.y - self.lu.y

    def _generate_room(self):
        #TODO create random sized room in the leaf
        room = Room(self.lu, self.rd)
        self._room = room
        return room

class Map():
    def __init__(self, width, height, min_room_space=10, split_threshold=1.25):
        self.width = width
        self.height = height
        self.lu = Position(0, 0)
        self.rd = Position(height-1, width-1)
        self.min_room_space = min_room_space
        self.split_threshold = split_threshold
        self._leaves = None
        self.board = [[Tile.ROCK.value] * (self.width) for _ in range(self.height)]

    @property
    def leaves(self):
        return self._leaves

    def generate(self):
        # Reset the board
        self.board = [[Tile.ROCK.value] * (self.width) for _ in range(self.height)]
        self._generate_leaves()
        for leaf in self.leaves:
            for pos in leaf.room.position_map:
                self.board[pos.y][pos.x] = Tile.EMPTY.value
            for pos in leaf.room.border_map:
                self.board[pos.y][pos.x] = Tile.WALL.value

        #TODO Create corridors by adding corridors from the children up to the highest in the tree

    def _generate_leaves(self):
        splitable = Queue()
        splitable.put(Leaf(self.lu, self.rd, None, self.min_room_space))
        leaves = Queue()
        while splitable.qsize() > 0:
            leaf = splitable.get()
            leaf_a, leaf_b = self._split(leaf)

            if leaf_a is None and leaf_b is None:
                leaves.put(leaf)
            else:
                leaf.children = (leaf_a, leaf_b)
                leaf_a.sibling = leaf_b
                leaf_b.sibling = leaf_a
                splitable.put(leaf_a)
                splitable.put(leaf_b)

        self._leaves = list(leaves.queue)

    def _split(self, leaf):
        if leaf.width / leaf.height >= self.split_threshold:
            return self._split_room(leaf, Split.HORIZONTAL)
        elif leaf.height / leaf.width >= self.split_threshold:
            return self._split_room(leaf, Split.VERTICAL)
        else:
            return self._split_room(leaf, choice([Split.VERTICAL, Split.HORIZONTAL]))

    def _split_room(self, leaf, direction):
        leaf_a = leaf_b = None
        if direction == Split.VERTICAL:
            if not leaf.height < self.min_room_space * 2:
                random_split = randint(leaf.lu.y + self.min_room_space, leaf.rd.y - self.min_room_space)
                leaf_a = Leaf(leaf.lu,
                              Position(random_split, leaf.rd.x),
                              leaf,
                              self.min_room_space)
                leaf_b = Leaf(Position(random_split + 1, leaf.lu.x),
                              leaf.rd,
                              leaf,
                              self.min_room_space)
        elif direction == Split.HORIZONTAL:
            if not leaf.width < self.min_room_space * 2:
                random_split = randint(leaf.lu.x + self.min_room_space, leaf.rd.x - self.min_room_space)
                leaf_a = Leaf(leaf.lu,
                              Position(leaf.rd.y, random_split),
                              leaf,
                              self.min_room_space)
                leaf_b = Leaf(Position(leaf.lu.y, random_split + 1),
                              leaf.rd,
                              leaf,
                              self.min_room_space)
        return leaf_a, leaf_b

    def __str__(self):
        return "n".join("".join(b) for b in self.board)

if __name__ == "__main__":
    m = Map(50, 50, 10)
    m.generate()
    print(m)

Example Output

##################################################
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ##         #
#              ##        ##           ############
#              ##        ##           ############
#              ##        ##           ##         #
#              ##        ##           ##         #
########################################         #
########################################         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ##         #
#          ##         ##              ############
#          ##         ##              ############
#          ##         ##              ##         #
#          ##         ##              ##         #
########################################         #
########################################         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
#           ##              ##        ##         #
##################################################

Please critique any and all


Get this bounty!!!

#StackBounty: #python #python-3.x #python-multiprocessing Avoid global variables for unpicklable shared state among multiprocessing.Poo…

Bounty: 100

I frequently find myself writing programs in Python that construct a large (megabytes) read-only data structure and then use that data structure to analyze a very large (hundreds of megabytes in total) list of small records. Each of the records can be analyzed in parallel, so a natural pattern is to set up the read-only data structure and assign it to a global variable, then create a multiprocessing.Pool (which implicitly copies the data structure into each worker process, via fork) and then use imap_unordered to crunch the records in parallel. The skeleton of this pattern tends to look like this:

classifier = None
def classify_row(row):
    return classifier.classify(row)

def classify(classifier_spec, data_file):
    global classifier
    try:
        classifier = Classifier(classifier_spec)
        with open(data_file, "rt") as fp, 
             multiprocessing.Pool() as pool:
            rd = csv.DictReader(fp)
            yield from pool.imap_unordered(classify_row, rd)
    finally:
        classifier = None

I’m not happy with this because of the global variable and the implicit coupling between classify and classify_row. Ideally, I would like to write

def classify(classifier_spec, data_file):
    classifier = Classifier(classifier_spec)
    with open(data_file, "rt") as fp, 
         multiprocessing.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)

but this does not work, because the Classifier object usually contains objects which cannot be pickled (because they are defined by extension modules whose authors didn’t care about that); I have also read that it would be really slow if it did work, because the Classifier object would get copied into the worker processes on every invocation of the bound method.

Is there a better alternative? I only care about 3.x.


Get this bounty!!!

#StackBounty: #python-3.x #pandas #ssh-tunnel after connect to remote database script doesn't exit

Bounty: 50

I connect to remote database. I’m using sshtunnel for this. I have no problem to connect to DB and get access to data.

My problem is that, my script doesn’t exit after everything.
I create connection, download data, print data, stop ssh connection, print ‘exit’. This script has cease working at line server.stop() and doesn’t print ‘stop’. I need to interrupt it to stop working this code.

This is code:

from sshtunnel import SSHTunnelForwarder
from sqlalchemy import create_engine
import pandas as pd


server = SSHTunnelForwarder(
    ('host', 22),
    ssh_password='password',
    ssh_username='username',
    remote_bind_address=('127.0.0.1', 3306)
)
server.start()

engine = create_engine(
    'mysql+mysqldb://db_user:db_pass@127.0.0.1:{}/temps'.format(server.local_bind_port))

query = 'SELECT * FROM temp'

df = pd.read_sql(query, engine)
print(df.head())
print(df.tail())

server.stop()
print('stop')

This script doesn’t print ‘stop’.

Question: Why this code can not stop working?


Get this bounty!!!

#StackBounty: #python-3.x #postgresql #sqlalchemy #psycopg2 #python-multiprocessing multiprocessing / psycopg2 TypeError: can't pic…

Bounty: 100

I followed the below code in order to implement a parallel select query on a postgres database:

https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/

My basic problem is that I have ~6k queries that need to be executed, and I am trying to optimise the execution of these select queries. Initially it was a single query with the where id in (...) contained all 6k predicate IDs but I ran into issues with the query using up > 4GB of RAM on the machine it ran on, so I decided to split it out into 6k individual queries which when synchronously keeps a steady memory usage. However it takes a lot longer to run time wise, which is less of an issue for my use case. Even so I am trying to reduce the time as much as possible.

This is what my code looks like:

class PostgresConnector(object):
    def __init__(self, db_url):
        self.db_url = db_url
        self.engine = self.init_connection()
        self.pool = self.init_pool()

    def init_pool(self):
        CPUS = multiprocessing.cpu_count()
        return multiprocessing.Pool(CPUS)

    def init_connection(self):
        LOGGER.info('Creating Postgres engine')
        return create_engine(self.db_url)

    def run_parallel_queries(self, queries):
        results = []
        try:
            for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
                results.append(i)
        except Exception as exception:
            LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
            raise
        finally:
            self.pool.close()
            self.pool.join()

        LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))

        return list(chain.from_iterable(results))

    def execute_parallel_query(self, query):
        con = psycopg2.connect(self.db_url)
        cur = con.cursor()
        cur.execute(query)
        records = cur.fetchall()
        con.close()

        return list(records)

However whenever this runs, I get the following error:

TypeError: can't pickle _thread.RLock objects

I’ve read lots of similar questions regarding the use of multiprocessing and pickleable objects but I cant for the life of me figure out what I am doing wrong.

The pool is generally one per process (which I believe is the best practise) but shared per instance of the connector class so that its not creating a pool for each use of the parallel_query method.

The top answer to a similar question:

Accessing a MySQL connection pool from Python multiprocessing

Shows an almost identical implementation to my own, except using MySql instead of Postgres.

Am I doing something wrong?

Thanks!

EDIT:

I’ve found this answer:

Python Postgres psycopg2 ThreadedConnectionPool exhausted

which is incredibly detailed and looks as though I have misunderstood what multiprocessing.Pool vs a connection pool such as ThreadedConnectionPool gives me. However in the first link it doesn’t mention needing any connection pools etc. This solution seems good but seems A LOT of code for what I think is a fairly simple problem?

EDIT 2:

So the above link solves another problem, which I would have likely run into anyway so I’m glad I found that, but it doesnt solve the initial issue of not being able to use imap_unordered down to the pickling error. Very frustrating.


Get this bounty!!!

#StackBounty: #python-3.x #oop #design-patterns #abstract-class Python design patterns: Nested Abstract Classes

Bounty: 50

In the following example code, I want every car object to be composed of brake_system and engine_system objects, which are stored as attributes on the car.

To implement this, I’ve defined Car, BreakSystem and EngineSystem as abstract classes. Every subclass of Car is required to define its respective BreakSystem and EngineSystem subclasses as class attributes.

Are there any potential problems with this approach? Or are there other design patterns better suited to handle nested abstractions?

from abc import ABC, abstractmethod

class Car(ABC):
    """
    every car object should have an Engine object and a BrakeSystem object
    """

    def __init__(self):
        self.engine_system = self._engine_type()
        self.brake_system = self._brake_type()

    @property
    @abstractmethod
    def _engine_type(self):
        raise NotImplementedError()

    @property
    @abstractmethod
    def _brake_type(self):
        raise NotImplementedError()

class EngineSystem(ABC):
    pass

class BrakeSystem(ABC):
    pass


class FooBrakeSystem(BrakeSystem):
    pass

class FooEngineSystem(EngineSystem):
    pass

class FooCar(Car):
    _engine_type = FooEngineSystem
    _brake_type = FooBrakeSystem

if __name__ == '__main__':
    obj = FooCar()


Get this bounty!!!

#StackBounty: #python #python-3.x #console #linux #installer Systemd service configuration helper script

Bounty: 50

I’ve written a script that semi-automates the process of configuring/creating a new systemd service.

The script is working perfectly well, however, I’ve had some trouble with the styling and readability, as well as with the argparse logic (argument parsing works well, but there is just too much to handle manually and not automatically with argparse, because I have many arguments and they have different requirements) – I don’t know if what I’ve done is best practice, because to me right now, it looks like a very poor idea.

The script right now loads a default schema (template) and allows the user to interactively configure a service using the options that are present in that template.

Notes: Version of Python used is 3.5.
There are some single cases of use where bugs might appear, however, the script is working correctly overall and the existence of such possible bugs should be ignored for the sake of this question, as they are not for here.

Available arguments for the script:

-c/--schema: Just loads a different template configuration to use 
for the interactive "asking" process. Takes the path to that template.

-s/--short: Uses a default schema (template) that is shorter than the 
original one (has less and more basic options)

-x/--extended: Uses a default schema (template) that is longer than the 
original one (has more options that are more complex)

-d/--directory: The directory in which the script should 
save the service unit file, by default "/etc/systemd/system".

--delete: Deletes the service unit file of the specified service name and 
disables/stops that service. If used, should be the only argument used 
along with service_name.

--edit: Edits the service unit file (opens an editor) 
of the specified service name. If used, should be the only argument used 
along with service_name.

-b/--build: Builds a default service configuration unit file as an example. 
Should be the ONLY argument used, even the 
positional argument service_name should not be used.

--info: Outputs information about the script - purpose, maintainer, etc. 
Again should be the ONLY argument used.

service_name: The positional argument which tells what is the name 
of the service that has to be configured/edited/deleted. 
Doesn't have to be used with --build/--info.

The script:

#!/usr/bin/env python3

"""
This is a helper script that semi-automates the process
of configuring/editing systemd services.

Examples --
Create a service using a short preset configuration:
sudo ./service-config.py --short some_service_name

Create a service using a custom preset configuration template:
sudo ./service-config.py -c some_schema some_service_name

Edit an existing service:
sudo ./service-config.py --edit some_service_name

# To do list:
1. Document the script.
2. Fix permission issues with some of the arguments.
3. Add a configuration file for the script for 
   some of the default values and options. (?)
4. Allow the user to choose an editor.

NOTES:
1. This script requires root privileges for most use-cases.
2. Skipping a configuration option (just pressing enter) without
   supplying any value will just tell the script to skip that option.
"""

# IMPORTS:

import argparse
import configparser
import datetime
import os
import subprocess
import sys
import time

from collections import OrderedDict

# DEFINING CONSTANTS:

VERSION             = "0.4"
MAINTAINER_NICK     = "..."
MAINTAINER_EMAIL    = "...@gmail.com"
TRACE               = True
SCHEMA              = "schemas/service-config"
SCHEMA_SHORT        = "schemas/short_service-config"
SCHEMA_EXTENDED     = "schemas/extended_service-config"
CONFIG              = None # for future uses
OUTPUT_DIR          = "/etc/systemd/system"

# ERRORS:

USER_ABORT          = 5
ARGPARSE_ERR        = 6
CONFIGURATION_ERR   = 7
GLOBAL_ERR          = 8
SCHEMA_ERR          = 9
SYSTEMD_ERR         = 10
UID_ERROR           = 11
FINISH_ERROR        = 12

# DEFAULTS:

DEFAULT_EDITOR           = "vim"
DEFAULT_BUILD_SCHEMA     = "schemas/default-schema"

DEFAULT_DESCRIPTION      = "Example"
DEFAULT_AFTER            = "network.target"
DEFAULT_TYPE             = "simple"
DEFAULT_USER             = "root"
DEFAULT_GROUP            = "root"
DEFAULT_EXEC_START       = "/bin/true"
DEFAULT_EXEC_STOP        = "/bin/true"
DEFAULT_KILL_MODE        = "control-group"
DEFAULT_KILL_SIGNAL      = "SIGTERM"
DEFAULT_PID_FILE         = "/run/service.pid"
DEFAULT_RESTART          = "on-failure"
DEFAULT_RESTART_SEC      = "2"
DEFAULT_TIMEOUT_STOP_SEC = "5"
DEFAULT_DYNAMIC_USER     = "no"
DEFAULT_ENVIRONMENT_FILE = "/etc/service/env"
DEFAULT_STANDARD_OUTPUT  = "journal"
DEFAULT_STANDARD_ERROR   = "journal"
DEFAULT_WANTED_BY        = "multi-user.target"

# COLORS AND Formatting:

def tty_supports_ansi():
    """Checks whether the terminal used supports ANSI codes."""

    for handle in [sys.stdout, sys.stderr]:
        if ((hasattr(handle, "isatty") and handle.isatty()) or
            ('TERM' in os.environ and os.environ['TERM'] == "ANSI")):
            return True
        else:
            return False

class Formatting:
    """
    A class containing constants with most Formatting/basic colors
    for Unix-based terminals and vtys.
    Does NOT work on Windows cmd, PowerShell, and their varieties!
    """

    RESET            = "33[0m"
    BOLD             = "33[1m"
    DIM              = "33[2m"
    ITALIC           = "33[3m"
    UNDERLINE        = "33[4m"
    BLINK            = "33[5m" # Doesn't work on some terminals.
    INVERT           = "33[7m"
    HIDDEN           = "33[8m"
    FG_DEFAULT       = "33[39m"
    FG_BLACK         = "33[30m"
    FG_RED           = "33[31m"
    FG_GREEN         = "33[32m"
    FG_YELLOW        = "33[33m"
    FG_BLUE          = "33[34m"
    FG_MAGENTA       = "33[35m"
    FG_CYAN          = "33[36m"
    FG_LIGHT_GRAY    = "33[37m"
    FG_DARK_GRAY     = "33[90m"
    FG_LIGHT_RED     = "33[91m"
    FG_LIGHT_GREEN   = "33[92m"
    FG_LIGHT_YELLOW  = "33[93m"
    FG_LIGHT_BLUE    = "33[94m"
    FG_LIGHT_MAGENTA = "33[95m"
    FG_LIGHT_CYAN    = "33[96m"
    FG_WHITE         = "33[97m"
    BG_DEFAULT       = "33[49m"
    BG_BLACK         = "33[40m"
    BG_RED           = "33[41m"
    BG_GREEN         = "33[42m"
    BG_YELLOW        = "33[43m"
    BG_BLUE          = "33[44m"
    BG_MAGENTA       = "33[45m"
    BG_CYAN          = "33[46m"
    BG_LIGHT_GRAY    = "33[47m"
    BG_DARK_GRAY     = "33[100m"
    BG_LIGHT_RED     = "33[101m"
    BG_LIGHT_GREEN   = "33[102m"
    BG_LIGHT_YELLOW  = "33[103m"
    BG_LIGHT_BLUE    = "33[104m"
    BG_LIGHT_MAGENTA = "33[105m"
    BG_LIGHT_CYAN    = "33[106m"
    BG_WHITE         = "33[107m"

    def __init__(self):
        self.is_supported = tty_supports_ansi()

    def ansi(self, ansi_key):
        """
        The format method for this class. Returns the proper
        chosen formatting if it is supported, else does nothing.
        Takes ansi_key as argument, where ansi_key is one of the
        defined constants in the class.
        """
        if self.is_supported:
            return getattr(self, ansi_key)
        else:
            return ""

# The formatting/color class handler variable

FTY = Formatting()

# Code starts from here:

def printf(text, f="RESET", **kwargs):
    """
    A print function with formatting.
    Always prints on stdout.
    As arguments takes:
    1. string (text to print)
    2. formatting type (bold, italic, etc.)
    3+. kwargs passed to the print function.
    """

    f = FTY.ansi(f.upper())

    print("{}{}".format(f, text), file=sys.stdout, **kwargs)


def print_info():
    """Print information about the script."""

    printf("This is a helper script for configuring systemd services.", f="bold")
    printf("{}Maintainer: {}{}".format(FTY.ansi("FG_GREEN"), FTY.ansi("RESET"), MAINTAINER_NICK))
    printf("{}Email: {}{}".format(FTY.ansi("FG_GREEN"), FTY.ansi("RESET"), MAINTAINER_EMAIL))

    sys.exit(0)

def parse_arg():
    """Get user arguments and configure them."""

    parser = argparse.ArgumentParser(description="Systemd services configuration script")
    no_pos = parser.add_mutually_exclusive_group()
    schema = parser.add_mutually_exclusive_group()
    exclus = parser.add_mutually_exclusive_group()
    exclus.add_argument("-c",
                        "--schema",
                        help="Choose a custom schema and load defaults from it.",
                        type=str,
                        default=SCHEMA)
    exclus.add_argument("--edit",
                        help="Directly edit a systemd unit file.",
                        action="store_true",
                        default=False)
    no_pos.add_argument("--info",
                        help="Show information about the script.",
                        action="store_true",
                        default=False)
    no_pos.add_argument("-b",
                        "--build",
                        help="Builds a default schema in schemas/default-schema",
                        action="store_true",
                        default=False)
    schema.add_argument("-s",
                        "--short",
                        help="Use a short configuration schema.",
                        action="store_true",
                        default=False)
    schema.add_argument("-x",
                        "--extended",
                        help="Use a long configuration schema.",
                        action="store_true",
                        default=False)
    exclus.add_argument("-d",
                        "--directory",
                        help="Output directory for the service unit file.",
                        type=str,
                        default=OUTPUT_DIR)
    exclus.add_argument("--delete",
                        help="Delete the specified service's configuration file.",
                        action="store_true",
                        default=False)
    no_pos.add_argument("service_name",
                        help="The name of the service to configure/edit.",
                        type=str,
                        nargs='?')

    try:
        args = parser.parse_args()
        check_parser_opts(args)
    except argparse.ArgumentError:
        print("Error: An error occured while parsing your arguments.", file=sys.stderr)
        sys.exit(ARGPARSE_ERR)

    return args

def check_parser_opts(args):
    """
    Check if all supplied arguments are used correctly.
    Returns an error if the combination of 
    supplied arguments is illegal.
    For arguments that are supposed to be 
    single checks if any other arguments
    are used (besides directory and schema, 
    since they have default values already).
    """

    all_args = [
        'build',
        'service_name',
        'info',
        'short',
        'extended',
        'delete',
        'directory',
        'edit',
        'schema'
    ]

    error = False

    # --build is supposed to be used alone.
    if args.build:
        for arg in all_args:
            if (arg is not 'build'     and 
                arg is not 'directory' and 
                arg is not 'schema'):
                value = getattr(args, arg)
            else:
                value = None
            if value:
                print("The argument -b/--build cannot be used with {}.".format(arg))
                error = True

    # --info is supposed to be used alone.
    if args.info:
        for arg in all_args:
            if (arg is not 'info'      and 
                arg is not 'directory' and 
                arg is not 'schema'):
                value = getattr(args, arg)
            else:
                value = None
            if value:
                print("The argument --info cannot be used with {}.".format(arg))
                error = True

    # --delete is supposed to be used only with service_name.
    if args.delete:
        for arg in all_args:
            if (arg is not 'delete'    and 
                arg is not 'directory' and 
                arg is not 'schema'    and 
                arg is not 'service_name'):
                value = getattr(args, arg)
            else:
                value = None
            if value:
                print("The argument --delete cannot be used with {}.".format(arg))
                error = True

    # --edit is supposed to be used only with service_name.
    if args.edit:
        for arg in all_args:
            if (arg is not 'edit'      and 
                arg is not 'directory' and 
                arg is not 'schema'    and 
                arg is not 'service_name'):
                value = getattr(args, arg)
            else:
                value = None
            if value:
                print("The argument --edit cannot be used with {}.".format(arg))
                error = True

    if error:
        printf("{}Error: wrong argument usage, aborting.".format(FTY.ansi("FG_RED")), f="bold")
        sys.exit(ARGPARSE_ERR)

def get_fragment_path(service):
    """
    Returns the path of the systemd service's unit configuration file.
    """

    # Extremely ugly (imo) multiline statement
    sysctl_out = subprocess.check_output("systemctl show {} -p FragmentPath".format(service), 
        shell=True)
    filename = sysctl_out.decode('utf-8').strip().split('=')[1]

    return filename

def edit(service, manual=False, finish=True):
    """
    Open the service's systemd service 
    unit configuration file for editing.
    """

    # Check if destination is already set to the full path.
    if manual:
        file = service
    else:
        file = get_fragment_path(service)

    # Open vim to edit the configuration file. This is a TODO.
    with subprocess.Popen(["{} {}".format(DEFAULT_EDITOR, file)], shell=True) as command:
        subprocess.Popen.wait(command)

    if finish: 
        finish(file, mode="edit")

def delete(service):
    """
    Deletes the given service configuration file, 
    stops and disables the service.
    """

    # Get the destination of the service unit file.
    destination = get_fragment_path(service)

    # Ask the user for confirmation if it's a system service.
    if destination.startswith("/lib/systemd/system"):
        print("This is not a user-configured service, do you want to delete it anyway? [y/N]: ")
        force_delete = input()
        if not (force_delete and (force_delete.lower() == 'y' or force_delete.lower() == 'yes')):
            print("Aborting...")
            sys.exit(0)

    # Stop, disable, and delete the service.
    print("Deleting service...")
    sysctl_service(service, "stop")
    sysctl_service(service, "disable")
    os.remove(destination)
    service_reload()
    print("Deleted service.")

def setup(args):
    """
    Check systemd version available on the host to confirm compability.
    Also checks whether we have permissions to use
    most of the script's functionality.
    """

    # Get the systemd version to confirm compability. This is for a future update.
    try:
        systemd_version = subprocess.check_output('systemd --version', shell=True)
        systemd_version = int(systemd_version.strip().split()[1])
    except subprocess.CalledProcessError:
        print("Systemd isn't working on your system. Why even use this script?", file=sys.stderr)
        sys.exit(SYSTEMD_ERR)

    if os.getuid() > 0 and not args.build and args.directory == OUTPUT_DIR:
        # Extremely ugly (imo) multiline statement
        printf("{}Insufficient permissions. "
               "You have to run the script as root (with sudo).".format(
                FTY.ansi("FG_LIGHT_RED")), f="bold", file=sys.stderr)
        sys.exit(UID_ERROR)

    return systemd_version

def build():
    """
    Build a default example unit configuration schema, 
    using default constants.
    """

    if os.path.exists(DEFAULT_BUILD_SCHEMA):
        print("Error: {} already exists.".format(DEFAULT_BUILD_SCHEMA), file=sys.stderr)
        sys.exit(SCHEMA_ERR)
    else:
        schema = configparser.ConfigParser()
        schema.optionxform = str
        schema['Unit'] = OrderedDict(
            Description     = DEFAULT_DESCRIPTION,
            After           = DEFAULT_AFTER
        )

        schema['Service'] = OrderedDict(
            Type            = DEFAULT_TYPE,
            ExecStart       = DEFAULT_EXEC_START,
            ExecStop        = DEFAULT_EXEC_STOP,
            Restart         = DEFAULT_RESTART,
            RestartSec      = DEFAULT_RESTART_SEC,
            User            = DEFAULT_USER,
            Group           = DEFAULT_GROUP,
            PIDFile         = DEFAULT_PID_FILE,
            EnvironmentFile = DEFAULT_ENVIRONMENT_FILE,
            KillMode        = DEFAULT_KILL_MODE,
            KillSignal      = DEFAULT_KILL_SIGNAL,
            TimeoutStopSec  = DEFAULT_TIMEOUT_STOP_SEC,
            StandardOutput  = DEFAULT_STANDARD_OUTPUT,
            StandardError   = DEFAULT_STANDARD_ERROR,
            DynamicUser     = DEFAULT_DYNAMIC_USER
        )

        schema['Install'] = OrderedDict(
            WantedBy        = DEFAULT_WANTED_BY
        )

        with open(DEFAULT_BUILD_SCHEMA, 'w+') as schemafile:
            schema.write(schemafile)

        finish(DEFAULT_BUILD_SCHEMA, mode="build")

def load_schema(schema):
    """
    Read the service unit configuration file and load it.
    """

    config_dict = {}

    config = configparser.ConfigParser()
    config.optionxform = str
    config.read(schema)

    return config

def parse_config(cfg):
    """
    Parse the configuration file and return it as dictionaries.
    """

    config         = argparse.Namespace(**OrderedDict(cfg))
    config.Unit    = OrderedDict(config.Unit)
    config.Service = OrderedDict(config.Service)
    config.Install = OrderedDict(config.Install)

    return config

def write_config(cfg, destination):
    """
    Save the unit configuration file to the destination.
    """

    config = configparser.ConfigParser()
    config.optionxform = str
    config['Unit'] = cfg.Unit
    config['Service'] = cfg.Service
    if cfg.Install:
        config['Install'] = cfg.Install
    with open(destination, 'w') as unitfile:
        config.write(unitfile)
        unitfile.write("# Automatically generated by service-config.n")

def user_configuration(config):
    """
    Let the user interactively configure the unit file.
    """

    user_config = config

    # Ask for the [Unit] section's keys.
    printf("{}[Unit] section configuration:".format(FTY.ansi("FG_YELLOW")), f="bold")
    for key in config.Unit:
        printf("{}{}={}".format(FTY.ansi("FG_GREEN"), key, FTY.ansi("RESET")), f="bold", end="")
        value = input()
        user_config.Unit[key] = value

    # Ask for the [Service] section's keys.
    print()
    printf("{}[Service] section configuration:".format(FTY.ansi("FG_BLUE")), f="bold")
    for key in config.Service:
        printf("{}{}={}".format(FTY.ansi("FG_GREEN"), key, FTY.ansi("RESET")), f="bold", end="")
        value = input()
        user_config.Service[key] = value

    # Ask for the [Install] section's keys.
    print()
    printf("{}[Install] section configuration:".format(FTY.ansi("FG_MAGENTA")), f="bold")
    for key in config.Install:
        printf("{}{}={}".format(FTY.ansi("FG_GREEN"), key, FTY.ansi("RESET")), f="bold", end="")
        value = input()
        user_config.Install[key] = value

    # Clear the dictionaries from any empty keys.
    user_config.Unit = {k: v for k, v in user_config.Unit.items() if v}
    user_config.Service = {k: v for k, v in user_config.Service.items() if v}
    user_config.Install = {k: v for k, v in user_config.Install.items() if v}

    return user_config

def service_reload():
    """
    A simple daemon-reload wrapper.
    """

    subprocess.call('systemctl daemon-reload', shell=True)

def sysctl_service(service, action):
    """
    A simple systemctl wrapper for service management.
    """

    subprocess.call('systemctl {} {}'.format(action, service), shell=True)

def finish(destination, mode="create"):
    """
    Checks whether the file has been saved successfully and exits.
    """

    if os.path.exists(destination):
        if mode == "create":
            print("{}Service created successfully.".format(FTY.ansi("FG_GREEN")))
        elif mode == "edit":
            print("{}Service edited successfully.".format(FTY.ansi("FG_YELLOW")))
        elif mode == "build":
            print("{}Default schema built successfully.".format(FTY.ansi("FG_BLUE")))
        sys.exit(0)
    else:
        print("The script failed to finish successfully.")
        sys.exit(FINISH_ERROR)

def main():
    """
    The main function that handles the program.
    """

    # Get the parsed arguments.
    args = parse_arg()

    # Check the version of systemd and check permissions.
    systemd_version = setup(args)

    # Exit if service_name contains illegal characters.
    if args.service_name:
        if 'x00' in args.service_name or '/' in args.service_name:
            print("Service name contains symbols that are not allowed.")
            sys.exit(ARGPARSE_ERR)

    if args.delete:
        delete(args.service_name)
        sys.exit(0)
    if args.info:
        print_info()
    if args.build:
        build()
    if args.edit:
        edit(args.service_name)
    if args.short:
        args.schema = SCHEMA_SHORT
        print("Using short schema configuration.")
    if args.extended:
        args.schema = SCHEMA_EXTENDED
        print("Using extended schema configuration.")

    # Load and parse the unit configuration schema.
    schema = load_schema(args.schema)
    config = parse_config(schema)

    # Start interactive configuration, aborts on CTRL-C/CTRL-D.
    try:
        user_config = user_configuration(config)
    except (EOFError, KeyboardInterrupt):
        print("nAborting.")
        sys.exit(USER_ABORT)

    # Check whether the supplied service name ends with .service.
    if not args.service_name.endswith('.service'):
        args.service_name = args.service_name + '.service'

    # Save the configured unit file to the destination directory.
    destination = os.path.join(args.directory, args.service_name)
    write_config(user_config, destination)


    # Interactive section:

    print("Do you want to manually edit the new configuration? [y/N]: ", end="")
    manual = input()

    if manual and manual.lower() == "y":
        print("Opening editor...")
        edit(destination, manual=True, finish=False)
    else:
        print("The configuration file won't be edited.")

    # Allow these options only if we have permissions for them.
    if os.getuid() == 0:
        print("Do you want to enable the service? [y/N]: ", end="")
        enable = input()

        if enable and enable.lower() == "y":
            print("Enabling service...")
            service_reload()
            sysctl_service(args.service_name, "enable")
            print("Service enabled.")
        else:
            print("Service won't be enabled.")


        print("Do you want to start the service? [Y/n]: ", end="")
        start = input()

        if not start or (start and start.lower() == "y"):
            print("Starting service...")
            service_reload()
            sysctl_service(args.service_name, "start")
            print("Service started.")
        else:
            print("Service won't be started.")

    elif os.getuid() > 0:
        # Extremely ugly (imo) multiline statement
        print("{}No permissions to enable/start service. "
              "Need to run with root privileges.".format(FTY.ansi("FG_RED")))

    finish(destination)


# Name guard for main process.
if __name__ == "__main__":
    # Don't use global error handling if TRACE is set to True.
    if TRACE:
        main()
    elif not TRACE:
        try:
            main()
        except Exception as error:
            print("A global exception has been caught.", file=sys.stderr)
            print(err, file=sys.stderr)
            sys.exit(GLOBAL_ERR)

I want this script to be reviewed for styling/formatting and readability. I do want it to be fairly easy to read it, since I won’t be keeping it only to myself.

Questions:

  1. I’ve had issues with styling. There are some “extremely ugly multiline statements” that I’ve made in order to comply with the 100 characters limit I’ve chosen. Such lines are commented, I do believe them to be ugly. Is the way I’ve made them really best practice or is there a better way?
  2. The Formatting class. I have a class that I use to format/colorize the output of my script, however, the ANSI color codes do not work on every terminal, thus I’ve made the class the way it is. Is it good practice to have such a class with constants and return the values of those constants by using a method (that allows me to check if the terminal supports ANSI)? Is the way I’ve gone about coloring/formatting the output a good idea at all, having so many constants for that in my main program and also having a variable to initialize such a class?
  3. The way I check for the argument usage. Sure, I have mutually exclusive groups, but those aren’t nearly enough to help me achieve what I need. So I’ve made a function check_parser_opts(args) that checks whether the arguments are used correctly with a bunch of if statements. Is the way I’ve done it proper, how could I go about this? Or should I just try to simplify my arguments and not have this many?
  4. I’ve always wondered how should my functions be ordered. Which function should be in which part of the file, after which function, and etc. Is there any “proper” way to order your functions? Currently I just have my main() function at the bottom and I just put everything else above it, without really thinking too much.

Note: For enthusiasts that are interested in the progress of this script, you can follow the github repo.


Get this bounty!!!

#StackBounty: #python #python-3.x #lexer Lexer for Apache logs in Python

Bounty: 100

I recently learned Python using the book by Mark Summerfeld and I am now doing a few katas of mine when I learn a new language. This helps to get the “how the language works” and I wrote a simple lexer for Apache logs.

An example of log records are

64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
64.242.88.10 - - [07/Mar/2004:16:06:51 -0800] "GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1" 200 4523
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

and program I wrote reads all values as tokens and output them using | as a separator, as in

64.242.88.10|-|-|07/Mar/2004:16:05:49 -0800|GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1|401|12846
64.242.88.10|-|-|07/Mar/2004:16:06:51 -0800|GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1|200|4523
64.242.88.10|-|-|07/Mar/2004:16:10:02 -0800|GET /mailman/listinfo/hsdivision HTTP/1.1|200|6291

The lexer can be used for more general usage though.

Here is my code and I would love to read your comments and suggestions about how to improve it. I have also raised a few specific questions on certain topics.

import collections
import io
import sys

LEXER_SKIP_WHITESPACE=0
LEXER_READ_QUOTED_STRING=1
LEXER_READ_BRACKETED_STRING=2
LEXER_READ_WORD=3

class Location:
    def __init__(self, name=None, pos=0, line=1, col=0):
        self.name = name or "<input>"
        self.line = line
        self.col = col
        self.pos = pos
    def update(self, c):
        self.pos += 1
        if c == "n":
            self.line += 1
            self.col = 0
        else:
            self.col += 1
    def __repr__(self):
        return str.format("Location({}, {}, {}, {})", repr(self.name), repr(self.pos), repr(self.line), repr(self.col))
    def __str__(self):
        return str.format("{}: {}: line {}, column {}", self.name, self.pos, self.line, self.col)

def readchar(inputchannel, location):
    while True:
        maybechar = inputchannel.read(1)
        if maybechar == '':
            return None
        else:
            location.update(maybechar)
            yield maybechar

def readtoken(inputchannel, location):
    state = LEXER_SKIP_WHITESPACE
    token = ''
    for nextchar in readchar(inputchannel, location):
        if state is LEXER_SKIP_WHITESPACE:
            if nextchar == "n":
                yield "n"
                continue
            elif nextchar.isspace():
                continue
            elif nextchar == '"':
                state = LEXER_READ_QUOTED_STRING
                continue
            elif nextchar == '[':
                state = LEXER_READ_BRACKETED_STRING
                continue
            else:
                state = LEXER_READ_WORD
                token += nextchar
                continue
        elif state is LEXER_READ_QUOTED_STRING:
            if nextchar == '"':
                yield token
                token = ''
                state = LEXER_SKIP_WHITESPACE
                continue
            else:
                token += nextchar
                continue
        elif state is LEXER_READ_BRACKETED_STRING:
            if nextchar == ']':
                yield token
                token = ''
                state = LEXER_SKIP_WHITESPACE
                continue
            else:
                token += nextchar
                continue
        elif state is LEXER_READ_WORD:
            if nextchar == "n":
                yield token
                token = ''
                state = LEXER_SKIP_WHITESPACE
                yield "n"
                continue
            elif nextchar.isspace():
                yield token
                token = ''
                state = LEXER_SKIP_WHITESPACE
                continue
            else:
                token += nextchar
                continue
        else:
            raise Error("Impossible lexer state.")
    if state is LEXER_SKIP_WHITESPACE:
        return None
    elif state is LEXER_READ_QUOTED_STRING:
        raise Error("End of character stream in quoted string.")
    elif state is LEXER_READ_BRACKETED_STRING:
        raise Error("End of character stream in quoted string.")
    elif state is LEXER_READ_WORD:
        yield token
        return None
    else:
        raise Error("Impossible lexer state.")


class Lexer:
    def __init__(self, inputchannel, _location=None):
        self.location = _location or Location("<input>", 0, 1, 0)
        self.inputchannel = inputchannel
        self.buf = ''
        self.state = LEXER_SKIP_WHITESPACE

    def __iter__(self):
        return readtoken(self.inputchannel, self.location)

if __name__ == "__main__":
    sep = ''
    for token in Lexer(sys.stdin):
        if token == 'n':
            sys.stdout.write(token)
            sep = ''
        else:
            sys.stdout.write(sep)
            sys.stdout.write(token)
            sep = '|'

Question 1 – How to write lexers in Python?

I am well aware that there are tools similar to lex and yacc for Python that could have been used here, but that would defeat the purpose of learning how to write such a program in Python. I found it surprisingly hard.

My first misfortune is that Python does not do tail-elimination, so it is essentially forbidden to write a lexer as a set of mutually recursive functions. Mutually recursive functions are one of my favourite tools to do so, because it clearly singles out a specific state of the lexer (the recursive function it is in) and the transitions from this state to the other states, and makes it easy to test-case individually each of the transitions.

Since maybe not everybody is familiar with lexers based on mutually recursive functions here is the equivalent of the readtoken generator in OCaml. The beginning of read_token reads like “If you read a double quote, discard it and do read_quotedstring” and that function itself is defined later and does what on expects: it aggregates characters in a buffer until the next double quote and bless the result as a token.

let rec read_token f ((ax,buffer) as state) s =
  match Stream.peek s with
  | Some('"') -> Stream.junk s; read_quotedstring f state s
  | Some('[') -> Stream.junk s; read_timestamp f state s
  | Some(' ')
  | Some('t')-> Stream.junk s; read_token f state s
  | Some(c) -> read_simpleword f state s
  | None -> ax
and read_simpleword f ((ax,buffer) as state) s =
  match Stream.peek s with
  | Some('"')
  | Some('[')
  | Some(' ')
  | Some('t') ->
    let current_token = Buffer.contents buffer in
    Buffer.clear buffer;
    read_token f (f ax current_token, buffer) s
  | Some(c) ->
    Buffer.add_char buffer c;
    Stream.junk s;
    read_simpleword f state s
  | None ->
    ax
and read_quotedstring f ((ax,buffer) as state) s =
  match Stream.peek(s) with
  | Some('"') ->
    Stream.junk s;
    let current_token = Buffer.contents buffer in
    Buffer.clear buffer;
    read_token f (f ax current_token, buffer) s
  | Some(c) ->
    Stream.junk s;
    Buffer.add_char buffer c;
    read_quotedstring f state s
  | None ->
    failwith "End of stream within a quoted string"
and read_timestamp f ((ax,buffer) as state) s =
  match Stream.peek(s) with
  | Some(']') ->
    Stream.junk s;
    let current_token = Buffer.contents buffer in
    Buffer.clear buffer;
    read_token f (f ax (timestamp_to_iso current_token), buffer) s
  | Some(c) ->
    Stream.junk s;
    Buffer.add_char buffer c;
    read_timestamp f state s
  | None ->
    failwith "End of stream within a bracketed string"

My Python version defines a few states as symbolic constants as I would have done in C and then build a huge while loop keeping track of the transitions. I am not please with that implementation, because it does not give me any tool to manage the complexity of the lexer. Working with functions is very unpleasant because of the details of the scope of variables in Python. So what would be the idiomatic way to break down the lexer in small testable pieces, which would be important if I would decide to write a more complicate parser? Could the idea to represent lexer-states by objects be interesting?

Question 2. Is it right to treat stdin as a character stream?

Obviously I somehow did that but Python has no real character type and makes them look like length 1 strings. It feels to me that the approach of reading my input in chunks into “circular expandable buffers” and making copies of substrings of these chunks to generate my tokens would fit much more nicely in the language. Am I right?

Question 3. What is the all-purpose exception in Python?

This seems like a rather basic question but I am really failing to find a suitable answer in the documentation The Exception seems a poor choice, since it very general and make error identification and error handling rather complicated.

Question 4. Do generators return none?

Is it good style to return None in generators? Would pass also do?


Get this bounty!!!

#StackBounty: #python #python-3.x #mvc #asynchronous #url-routing Python (Sanic) Routing

Bounty: 50

I am new to Python and wanted to explore how I can associate routes with “controllers” with Sanic as a base. It works fine but “feels” a bit clunky but I can’t put my finger on why/how.

routes.py – Returning controller methods from a given route

from .index import IndexRoutes
from .clients import ClientRoutes
from Controllers.IndexController import *
from Controllers.ClientController import *

CLASSES = [
    IndexRoutes,
    ClientRoutes
]

def add_routes(app):
    routes = get_routes()

    for route in routes:
        for m in route['methods']:
            controller = m['controller']
            method = m['method']

            app.add_route(
                eval(controller + '.' + method),
                route['prefix'] + m['path'],
                methods=m['request_methods']
            )


def get_routes():
    routes = []

    for c in CLASSES:
        methods = getattr(c, 'ROUTE_METHODS')

        route = {
            'name': getattr(c, 'ROUTE_NAME'),
            'methods': [],
            'prefix': getattr(c, 'ROUTE_PREFIX')
        }

        for method in methods:
            route['methods'].append(getattr(c, method)())

        routes.append(route)

    return routes

client.py – Example of route definitions

class ClientRoutes:
    ROUTE_NAME = 'clients'
    ROUTE_METHODS = ['index', 'create', 'update', 'single', 'destroy']
    ROUTE_PREFIX = 'clients'


    def index():
        return {
            'controller': 'ClientController',
            'method': 'index',
            'path': '/',
            'request_methods': ['GET']
        }


    def create():
        return {
            'controller': 'ClientController',
            'method': 'create',
            'path': '/',
            'request_methods': ['POST']
        }


    def single():
        return {
            'controller': 'ClientController',
            'method': 'single',
            'path': '/<id>',
            'request_methods': ['GET']
        }


    def update():
        return {
            'controller': 'ClientController',
            'method': 'update',
            'path': '/<id>',
            'request_methods': ['PUT']
        }


    def destroy():
        return {
            'controller': 'ClientController',
            'method': 'destroy',
            'path': '/<id>',
            'request_methods': ['DELETE']
        }

ClientController.py – Example of controller

from sanic import response
from Database.Model import Model
from Database.Client import Client
from Helpers.transformers import to_dict

class ClientController:

    async def index(request):
        clients = Model(Client)
        return response.json(to_dict(clients.get()))

    async def create(request):
        data = request.json
        try:
            if 'name' not in data:
                raise KeyError('Name is required')
            else:
                if len(data['name']) < 1:
                    raise ValueError('Name is required')
        except KeyError as e:
            return response.json(str(e), 400)
        except ValueError as e:
            return response.json(str(e), 400)

        client = Model(Client)
        return response.json(to_dict(client.save(data)))

        # ... Other methods below, unnecessary for this example


Get this bounty!!!