#StackBounty: #python-3.x #matplotlib #fonts Set matplotlib font by family name when there are multiple fonts with same name

Bounty: 100

How can I tell matplotlib to use a specific variant of a font when they both have the same name and characteristics?

For example, I can tell matplotlib to use Latin Modern Roman:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
from pathlib import Path

import matplotlib as mpl
import PyQt5
mpl.use('Qt5Agg')
import matplotlib.pyplot as plt

mpl.rcParams['font.family'] = 'Latin Modern Roman'
mpl.rcParams['font.weight'] = '400'
mpl.rcParams['font.style'] = 'normal'
mpl.rcParams['font.variant'] = 'normal'
mpl.rcParams['mathtext.fontset'] = 'custom'
mpl.rcParams['mathtext.default'] = 'it'
mpl.rcParams['mathtext.rm'] = 'Latin Modern Roman:normal'
mpl.rcParams['mathtext.it'] = 'Latin Modern Roman:italic'
mpl.rcParams['mathtext.bf'] = 'Latin Modern Roman:bold'

mpl.rcParams['xtick.labelsize'] = 8
mpl.rcParams['ytick.labelsize'] = 8
mpl.rcParams['axes.titlesize'] = 10
mpl.rcParams['axes.labelsize'] = 10

x = np.linspace(0,2*np.pi,100)
y = np.sin(x)

fig1 = plt.figure(figsize=(3, 3*(9/16)), dpi=300)
ax1 = fig1.gca()
ax1.plot(x, y, c='r', linewidth=1.0, zorder=20)
ax1.set_xlabel(r'$x label$')
ax1.set_ylabel(r'$y label$')
fig1.tight_layout(pad=0.15)
plt.show()

enter image description here

OK, that’s nice. But what if I want to use a specific "substyle" of Latin Modern Roman? For example, on my system, when I list the available font entries having name='Latin Modern Roman' and weight=400 and style='normal' and variant='normal' through:

fonts = mpl.font_manager.fontManager.ttflist
for f in fonts:
    if (f.name=='Latin Modern Roman'):
        if all([(f.style=='normal'),(f.weight==400),(f.style=='normal'),(f.variant=='normal')]):
            print(f.name)
            print(Path(f.fname).stem)
            print(f.fname)
            print('weight : %s'%str(f.weight))
            print('style : %s'%str(f.style))
            print('stretch : %s'%str(f.stretch))
            print('variant : %s'%str(f.variant))
            print('n')

I get:

Latin Modern Roman
lmroman9-regular
/usr/share/texmf/fonts/opentype/public/lm/lmroman9-regular.otf
weight : 400
style : normal
stretch : normal
variant : normal

...
...

Latin Modern Roman
lmroman10-regular
/usr/share/texmf/fonts/opentype/public/lm/lmroman10-regular.otf
weight : 400
style : normal
stretch : normal
variant : normal

So how would I tell matplotlib that I want to use lmroman9-regular.otf vs lmroman10-regular.otf? In mpl.rcParams I can only specify font.family,font.weight,font.style and font.variant so if the two .otf files have all the same values, how can I tell it to use one .otf rather than the other? There are actually differences between the fonts, for example:

enter image description here

I’ve tried referencing the .otf file directly with a matplotlib.FontProperties instance, then renaming it, then pointing to the new name like:

prop = mpl.font_manager.FontProperties(fname='/usr/share/fonts/opentype/lmodern/lmroman10-regular.otf')
prop.set_name('Latin Modern Roman Type 10')
mpl.rcParams['font.family'] = 'Latin Modern Roman Type 10'

but then I get findfont: Font family ['Latin Modern Roman Type 10'] not found. Falling back to DejaVu Sans. because prop.get_name() still (mysteriously) returns Latin Modern Roman and not Latin Modern Roman Type 10

How can I tell matplotlib that I want to use a one over the other?

Note: I would NOT like to use LaTeX rendering (text.usetex) in matplotlib.

System:

  • Ubuntu-20.04 on WSL2 on Windows 10 Pro 19042
  • Python 3.8.5 / [GCC 9.3.0] on linux
  • matplotlib 3.4.1


Get this bounty!!!

#StackBounty: #python #python-3.x #web-scraping Take information from a webpage and compare to previous request

Bounty: 50

After I have been doing some improvements from my Previous code review. I have taken the knowledge to upgrade and be a better coder but now im here again asking for Code review where I think it could be better.

The purpose of this code is a monitoring that checks for a special site every random 30 to 120 seconds. If there has been a changes then it goes through some if statements as you can see and it will then print to my discord if there has been a changed made.

This is what I have created:

monitoring.py

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import concurrent.futures
import random
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List

import pendulum
from loguru import logger

from scrape_values import Product

store: str = "shelta"
link: str = "https://shelta.se/sneakers/nike-air-zoom-type-whiteblack-cj2033-103"

# -------------------------------------------------------------------------
# Utils
# -------------------------------------------------------------------------
_size_filter: Dict[str, datetime] = {}


def monitor_stock():
    """
    Function that checks if there has happen a restock or countdown change on the website
    """
    payload = Product.from_page(url=link).payload

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        while True:

            # Request for new product information
            new_payload = Product.from_page(url=link).payload

            # Release sleep
            release_date_sleeper(new_payload)

            # Check countdown timer comparision
            if countdown_timer_comparision(payload, new_payload):
                # Send notification to discord
                executor.submit(send_notification, new_payload, "Timer change!")
                # Replace list
                payload["displayCountDownTimer"] = new_payload["displayCountDownTimer"]

            # Check sizes comparision
            if sizes_comparision(payload, new_payload):
                # Send notification to discord
                executor.submit(send_notification, new_payload, "Restock!")
                # Replace list
                payload["sizes"] = new_payload["sizes"]

            else:
                # No changes happen
                logger.info("No changes made")

                payload["sizes"] = new_payload["sizes"]
                time.sleep(random.randint(30, 120))


def release_date_sleeper(payload) -> None:
    """
    Check if there is a release date on the website. We should sleep if there is to save resources
    :param payload:
    """
    if payload.get('releaseDate'):
        delta_seconds = (payload["releaseDate"].subtract(seconds=10)) - pendulum.now()
        if not delta_seconds.seconds:
            logger.info(f'Release date enabled | Will sleep to -> {(payload["releaseDate"].subtract(seconds=10)).to_datetime_string()}')
            time.sleep(delta_seconds.seconds)


def countdown_timer_comparision(payload, new_payload) -> bool:
    """
    Compare the first requests with the latest request and see if the countdown timer has been changed on the website
    :param payload: First request made
    :param new_payload: Latest request made
    :return: bool
    """
    if new_payload.get("displayCountDownTimer") and payload["displayCountDownTimer"] != new_payload[
        "displayCountDownTimer"]:
        logger.info(f'Detected new timer change -> Name: {new_payload["name"]} | Display Time: {new_payload["displayCountDownTimer"]}')
        return True


def sizes_comparision(payload, new_payload) -> bool:
    """
    Compare the first requests with the latest request and see if the sizes has been changed on the website
    :param payload: First request made
    :param new_payload: Latest request made
    :return: bool
    """
    if payload["sizes"] != new_payload["sizes"]:
        if spam_filter(new_payload["delay"], new_payload["sizes"]):
            logger.info(f'Detected restock -> Name: {new_payload["name"]} | Sizes: {new_payload["sizes"]}')
            return True


def send_notification(payload, status) -> Any:
    """
    Send to discord
    :param payload: Payload of the product
    :param status: Type of status that being sent to discord
    """
    payload["status"] = status
    payload["keyword"] = True
    # FIXME: call create_embed(payload) for post to discord
    # See more here https://codereview.stackexchange.com/questions/260043/creating-embed-for-discord-reading-from-dictionary


def spam_filter(delay: int, requests: List[str]) -> List[str]:
    """
    Filter requests to only those that haven't been made previously within our defined cooldown period.

    :param delay: Delta seconds
    :param requests:
    :return:
    """
    # Get filtered set of requests.
    filtered = [
        r for r in list(set(requests))
        if (
              r not in _size_filter
                or datetime.now() - _size_filter[r] >= timedelta(seconds=delay)
        )
    ]
    # Refresh timestamps for requests we're actually making.
    for r in filtered:
        _size_filter[r] = datetime.now()

    return filtered


if __name__ == "__main__":
    monitor_stock()

scrape_values.py

import json
import re
from dataclasses import dataclass
from typing import List, Optional

import requests
from bs4 import BeautifulSoup


@dataclass
class Product:
    name: Optional[str] = None
    price: Optional[str] = None
    image: Optional[str] = None
    sizes: List[str] = None

    @staticmethod
    def get_sizes(doc: BeautifulSoup) -> List[str]:
        pat = re.compile(
            r'^<script>var JetshopData='
            r'({.*})'
            r';</script>$',
        )
        for script in doc.find_all('script'):
            match = pat.match(str(script))
            if match is not None:
                break
        else:
            return []

        data = json.loads(match[1])
        return [
            variation
            for get_value in data['ProductInfo']['Attributes']['Variations']
            if get_value.get('IsBuyable')
            for variation in get_value['Variation']
        ]

    @classmethod
    def from_page(cls, url: str) -> Optional['Product']:
        with requests.get(url) as response:
            if not response.ok:
                return None
            doc = BeautifulSoup(response.text, 'html.parser')

        name = doc.select_one('h1.product-page-header')
        price = doc.select_one('span.price')
        image = doc.select_one('meta[property="og:image"]')

        return cls(
            name=name and name.text.strip(),
            price=price and price.text.strip(),
            image=image and image['content'],
            sizes=cls.get_sizes(doc),
        )

    @property
    def payload(self) -> dict:
        return {
            "name": self.name or "Not found",
            "price": self.price or "Not found",
            "image": self.image or "Not found",
            "sizes": self.sizes,
        }

My concern is that I might have done it incorrectly where I have split it into multiple functions that maybe is not necessary to do? Im not sure and I do hope I will get some cool feedbacks! Looking forward


Get this bounty!!!

#StackBounty: #python-3.x Take information from a webpage and compare to previous request

Bounty: 50

After I have been doing some improvements from my Previous code review. I have taken the knowledge to upgrade and be a better coder but now im here again asking for Code review where I think it could be better.

The purpose of this code is a monitoring that checks for a special site every random 30 to 120 seconds. If there has been a changes then it goes through some if statements as you can see and it will then print to my discord if there has been a changed made.

This is what I have created:

monitoring.py

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import concurrent.futures
import random
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List

import pendulum
from loguru import logger

from scrape_values import Product

store: str = "shelta"
link: str = "https://shelta.se/sneakers/nike-air-zoom-type-whiteblack-cj2033-103"

# -------------------------------------------------------------------------
# Utils
# -------------------------------------------------------------------------
_size_filter: Dict[str, datetime] = {}


def monitor_stock():
    """
    Function that checks if there has happen a restock or countdown change on the website
    """
    payload = Product.from_page(url=link).payload

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        while True:

            # Request for new product information
            new_payload = Product.from_page(url=link).payload

            # Release sleep
            release_date_sleeper(new_payload)

            # Check countdown timer comparision
            if countdown_timer_comparision(payload, new_payload):
                # Send notification to discord
                executor.submit(send_notification, new_payload, "Timer change!")
                # Replace list
                payload["displayCountDownTimer"] = new_payload["displayCountDownTimer"]

            # Check sizes comparision
            if sizes_comparision(payload, new_payload):
                # Send notification to discord
                executor.submit(send_notification, new_payload, "Restock!")
                # Replace list
                payload["sizes"] = new_payload["sizes"]

            else:
                # No changes happen
                logger.info("No changes made")

                payload["sizes"] = new_payload["sizes"]
                time.sleep(random.randint(30, 120))


def release_date_sleeper(payload) -> None:
    """
    Check if there is a release date on the website. We should sleep if there is to save resources
    :param payload:
    """
    if payload.get('releaseDate'):
        delta_seconds = (payload["releaseDate"].subtract(seconds=10)) - pendulum.now()
        if not delta_seconds.seconds:
            logger.info(f'Release date enabled | Will sleep to -> {(payload["releaseDate"].subtract(seconds=10)).to_datetime_string()}')
            time.sleep(delta_seconds.seconds)


def countdown_timer_comparision(payload, new_payload) -> bool:
    """
    Compare the first requests with the latest request and see if the countdown timer has been changed on the website
    :param payload: First request made
    :param new_payload: Latest request made
    :return: bool
    """
    if new_payload.get("displayCountDownTimer") and payload["displayCountDownTimer"] != new_payload[
        "displayCountDownTimer"]:
        logger.info(f'Detected new timer change -> Name: {new_payload["name"]} | Display Time: {new_payload["displayCountDownTimer"]}')
        return True


def sizes_comparision(payload, new_payload) -> bool:
    """
    Compare the first requests with the latest request and see if the sizes has been changed on the website
    :param payload: First request made
    :param new_payload: Latest request made
    :return: bool
    """
    if payload["sizes"] != new_payload["sizes"]:
        if spam_filter(new_payload["delay"], new_payload["sizes"]):
            logger.info(f'Detected restock -> Name: {new_payload["name"]} | Sizes: {new_payload["sizes"]}')
            return True


def send_notification(payload, status) -> Any:
    """
    Send to discord
    :param payload: Payload of the product
    :param status: Type of status that being sent to discord
    """
    payload["status"] = status
    payload["keyword"] = True
    # FIXME: call create_embed(payload) for post to discord
    # See more here https://codereview.stackexchange.com/questions/260043/creating-embed-for-discord-reading-from-dictionary


def spam_filter(delay: int, requests: List[str]) -> List[str]:
    """
    Filter requests to only those that haven't been made previously within our defined cooldown period.

    :param delay: Delta seconds
    :param requests:
    :return:
    """
    # Get filtered set of requests.
    filtered = [
        r for r in list(set(requests))
        if (
              r not in _size_filter
                or datetime.now() - _size_filter[r] >= timedelta(seconds=delay)
        )
    ]
    # Refresh timestamps for requests we're actually making.
    for r in filtered:
        _size_filter[r] = datetime.now()

    return filtered


if __name__ == "__main__":
    monitor_stock()

scrape_values.py

import json
import re
from dataclasses import dataclass
from typing import List, Optional

import requests
from bs4 import BeautifulSoup


@dataclass
class Product:
    name: Optional[str] = None
    price: Optional[str] = None
    image: Optional[str] = None
    sizes: List[str] = None

    @staticmethod
    def get_sizes(doc: BeautifulSoup) -> List[str]:
        pat = re.compile(
            r'^<script>var JetshopData='
            r'({.*})'
            r';</script>$',
        )
        for script in doc.find_all('script'):
            match = pat.match(str(script))
            if match is not None:
                break
        else:
            return []

        data = json.loads(match[1])
        return [
            variation
            for get_value in data['ProductInfo']['Attributes']['Variations']
            if get_value.get('IsBuyable')
            for variation in get_value['Variation']
        ]

    @classmethod
    def from_page(cls, url: str) -> Optional['Product']:
        with requests.get(url) as response:
            if not response.ok:
                return None
            doc = BeautifulSoup(response.text, 'html.parser')

        name = doc.select_one('h1.product-page-header')
        price = doc.select_one('span.price')
        image = doc.select_one('meta[property="og:image"]')

        return cls(
            name=name and name.text.strip(),
            price=price and price.text.strip(),
            image=image and image['content'],
            sizes=cls.get_sizes(doc),
        )

    @property
    def payload(self) -> dict:
        return {
            "name": self.name or "Not found",
            "price": self.price or "Not found",
            "image": self.image or "Not found",
            "sizes": self.sizes,
        }

My concern is that I might have done it incorrectly where I have split it into multiple functions that maybe is not necessary to do? Im not sure and I do hope I will get some cool feedbacks! Looking forward


Get this bounty!!!

#StackBounty: #python #python-3.x #discord Creating embed for Discord reading from dictionary

Bounty: 50

I have been working on where I create a payload of dicts with different values as store, name etc etc which you will see very soon. The idea is that with the payload I send to this script, it should see if the values is in the payload (dict) and if it is then add it into discord embed. As simple as it sounds.

I have done something like this:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import time
from threading import Thread
from typing import Dict

import pendulum
from discord_webhook import DiscordEmbed, DiscordWebhook
from loguru import logger

mixed_filtered = {
    'test1': 'https://discordapp.com/api/webhooks/529345345345345/6h4_yshmNDKktdT-0VevOhqdXG9rDhRWclIfDD4jY8IbdCQ5-kllob-k1251252151',
    'test2': 'https://discordapp.com/api/webhooks/529674575474577/6h4_yshmNDKktdT-0VevOhqdXG9rDhRWclIfDD4jY8IbdCQ5-kllob-fhgdfdghfhdh'
}

mixed_unfiltered = {
    'test1': 'https://discordapp.com/api/webhooks/12412421412412/6h4_yshmNDKktdT-0VevOhqdXG9rDhR12412412414Q5-kllob-kI2jAxCZ5PdIn',
    'test2': 'https://discordapp.com/api/webhooks/529617352682110997/6h4_yshmNDKktdT-0VevOhqdXG912412412412IbdCQ5-kllob-kI2jAxCZ5PdIn'
}


def create_embed(payload: dict) -> None:
    # -------------------------------------------------------------------------
    # Name of the product, URL of the product & New! or Restock! url
    # -------------------------------------------------------------------------
    embed = DiscordEmbed(
        url=payload["link"],
        description=payload["status"],
        color=8149447
    )

    # -------------------------------------------------------------------------
    # Image product
    # -------------------------------------------------------------------------
    embed.set_thumbnail(
        url=payload["image"]
    )

    # -------------------------------------------------------------------------
    # Store Name
    # -------------------------------------------------------------------------
    embed.add_embed_field(
        name="Site",
        value=f'{payload["store"]}'
    )

    # -------------------------------------------------------------------------
    # The price of the product
    # -------------------------------------------------------------------------
    if payload.get("price"):
        embed.add_embed_field(
            name="Price",
            value=payload["price"]
        )

    # -------------------------------------------------------------------------
    # If store is Nike
    # -------------------------------------------------------------------------
    if "Nike" in payload["store"]:

        if payload.get("nikeStatus"):
            embed.add_embed_field(
                name="u200b",
                value="u200b"
            )

            embed.add_embed_field(
                name="Status",
                value=payload["nikeStatus"]
            )

        # -------------------------------------------------------------------------
        # Nike  Sales Channel
        # -------------------------------------------------------------------------
        if payload.get("salesChannel"):
            embed.add_embed_field(
                name="Sales Channel",
                value="n".join(payload["salesChannel"])
            )

    # -------------------------------------------------------------------------
    # Sizes available
    # Add extra spaces for sizes to make it cleaner for discord embed
    # -------------------------------------------------------------------------
    if payload.get("sizes"):
        payload["stock"] = sum(v for v in payload["sizes"].values() if v)

        payload["sizes"] = [f"{k} - ({v})" if v else k for k, v in payload["sizes"].items()]
        # If we have stock in values then sum it up

        embed.add_embed_field(
            name="u200b",
            value="u200b"
        )

        characterCount, i = 0, 0
        for j, item in enumerate(payload["sizes"]):

            # There is a limitation for Discord where if we reach over 1020 characters for one embed column.
            # IT will throw a error. Now I check if the characters count is less than 900 then we create a new embed.
            if len(item) + characterCount > 900:
                embed.add_embed_field(
                    name="Sizes",
                    value="n".join(payload["sizes"][i:j])
                )
                characterCount, i = len(item), j
            else:
                characterCount += len(item)

        if characterCount:
            embed.add_embed_field(
                name="Sizes",
                value="n".join(payload["sizes"][i:])
            )

        embed.add_embed_field(
            name="u200b",
            value="u200b"
        )

        embed.add_embed_field(
            name="u200b",
            value="u200b"
        )

    # -------------------------------------------------------------------------
    # If store is footlocker
    # -------------------------------------------------------------------------
    if "Footlocker" in payload["store"]:
        if payload.get("stockLoaded"):
            embed.add_embed_field(
                name="Stock Loaded",
                value=payload["stockLoaded"].upper()
            )

        if payload.get("styleCode"):
            embed.add_embed_field(
                name="u200b",
                value="u200b"
            )
            embed.add_embed_field(
                name="Style Code",
                value=payload["styleCode"]
            )

    # -------------------------------------------------------------------------
    # Release date for the product
    # -------------------------------------------------------------------------
    if payload.get("releaseDate"):
        embed.add_embed_field(
            name="Release Date",
            value=payload["releaseDate"].to_datetime_string()
        )

    # -------------------------------------------------------------------------
    # Stock keeping unit etc. 508214-660
    # -------------------------------------------------------------------------
    if payload.get("sku"):
        embed.add_embed_field(
            name="SKU",
            value=payload["sku"]
        )

    # -------------------------------------------------------------------------
    # Total stock of the product
    # -------------------------------------------------------------------------
    if payload.get("stock"):
        embed.add_embed_field(
            name="Total Stock",
            value=payload["stock"]
        )

    # -------------------------------------------------------------------------
    # Login/Cart/Checkout shortcut links
    # -------------------------------------------------------------------------
    embed.add_embed_field(
        name="Shortcuts Links",
        value=f'{" | ".join(shortcuts for shortcuts in payload["shortcut"])}'
    )

    # -------------------------------------------------------------------------
    # Quick task for bots
    # -------------------------------------------------------------------------
    if payload.get("quicktask"):
        embed.add_embed_field(
            name="Quick Tasks",
            value=f'{" | ".join(shortcuts for shortcuts in payload["quicktask"])}'
        )

    # -------------------------------------------------------------------------
    # Footer timestamp
    # -------------------------------------------------------------------------
    embed.set_footer(
        text=f'AutoSnkr | {pendulum.now("Europe/Stockholm").format("YYYY-MM-DD [[]HH:mm:ss.SSSS[]]")}'
    )

    # -------------------------------------------------------------------------
    # Set title on the embed
    # -------------------------------------------------------------------------
    if payload.get('stock') and payload.get('name'):
        embed.title = f'({payload["stock"]}) {payload["name"]}'
    elif payload.get('name'):
        embed.title = payload["name"]
    else:
        embed.title = payload.get('link')

    # -------------------------------------------------------------------------
    # Send payload/embed to Discord Notification function
    # -------------------------------------------------------------------------
    collection = mixed_filtered if payload["keyword"] else mixed_unfiltered

    for region, discord_collection in collection.items():
        webhook = DiscordWebhook(
            url=discord_collection,
            username="AutoSnkr Monitor",
        )

        webhook.add_embed(embed)

        # Adding thread so each URL can post as fast as possible without needing to wait for each other
        Thread(
            target=post_embed,
            args=(
                payload,
                region,
                webhook
            )
        ).start()


def post_embed(payload: Dict, region: str, webhook: DiscordWebhook) -> None:
    success: bool = False

    while not success:
        try:
            response = webhook.execute()
            success = response.ok

            # If we get a 429, retry after a short delay
            if response.status_code == 429:
                sleep_time = int(response.headers["retry-after"]) / 1000
                logger.debug(f"Rate limit -> Retrying in {sleep_time} seconds")
                time.sleep(sleep_time)
                continue

            # any response other than a 429 or a 200 OK is an error.
            if not response.ok:
                # FIXME Add discord notficiation and raise exception
                pass

            logger.info(f"Succesfully sent to Discord Reporter -> {region}")

        except Exception as err:
            # FIXME Add discord notficiation and raise exception
            pass


if __name__ == '__main__':
    create_embed(
        {
            "store": "Basket4ballers",
            "link": "https://www.basket4ballers.com/en/pg/26471-nike-pg5-bred-cw3143-101.html",
            "name": "Nike PG5 Bred",
            "price": "EUR 119.9",
            "image": "https://cdn1.basket4ballers.com/114821-large_default/nike-pg5-bred-cw3143-101.jpg",
            "sizes": {
                "[EU 38.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205049)": 1,
                "[EU 39](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205052)": 1,
                "[EU 40](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205055)": 3,
                "[EU 40.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205058)": 4,
                "[EU 41](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205061)": 9,
                "[EU 42](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205064)": 11,
                "[EU 42.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205067)": 11,
                "[EU 43](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205070)": 16,
                "[EU 44](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205073)": 21,
                "[EU 44.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205076)": 15,
                "[EU 45](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205079)": 20,
                "[EU 45.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205082)": 7,
                "[EU 46](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205085)": 17,
                "[EU 47](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205088)": 7,
                "[EU 47.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205091)": 5,
                "[EU 48](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205094)": 3,
                "[EU 48.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205097)": 2,
                "[EU 49.5](https://www.basket4ballers.com/?controller=cart&add=1&as=true&qty=1&id_product=26471&token=e4d64f25476dcee4b08744d382dc405b&ipa=205100)": 1},
            "shortcut": ["[Login](https://www.basket4ballers.com/en/authentification?back=my-account)",
                         "[Cart](https://www.basket4ballers.com/en/commande)",
                         "[Checkout Delivery](https://www.basket4ballers.com/en/commande?step=1)",
                         "[Checkout Shipping Service](https://www.basket4ballers.com/en/commande)",
                         "[Checkout PAyment](https://www.basket4ballers.com/en/commande)"],
            "webhook": "mixed",
            "status": "Restock!",
            "keyword": True
        }
    )

The mock data is at the very bottom but in the future I will instead send the payload to the function.

I wonder what can I do to try to have less code but that still does the job. I feel like there should be a way more cleaner way to do this than what I did but looking forward to see what can be improved 🙂

Let me know if there is any missing information. The script should be runnable by copy pasting it but make sure to create your own discord webhooks to test the embed. I will unfortunately need to modify them so no one can spam me 🙂


Get this bounty!!!

#StackBounty: #python #python-3.x #subprocess #pipe python process doesn't write to stdout

Bounty: 100

I am making a terminal command line interface program as part of a bigger project. I want the user to be able to run arbitrary commands (like in cmd). The problem is that when I start a python process using subprocess, python doesn’t write anything to stdout. I am not even sure if it reads what I wrote in stdin. This is my code:

from os import pipe, read, write
from subprocess import Popen
from time import sleep

# Create the stdin/stdout pipes
out_read_pipe_fd, out_write_pipe_fd = pipe()
in_read_pipe_fd, in_write_pipe_fd = pipe()

# Start the process
proc = Popen("python", stdin=in_read_pipe_fd, stdout=out_write_pipe_fd,
             close_fds=True, shell=True)

# Make sure the process started
sleep(2)

# Write stuff to stdin
write(in_write_pipe_fd, b"print("hello world")n")

# Read all of the data written to stdout 1 byte at a time
print("Reading:")
while True:
    print(repr(read(out_read_pipe_fd, 1)))

The code above works when I change "python" to "myexe.exe" where myexe.exe is my hello world program written in C++ compiled by MinGW. Why does this happen? This is the full code but the above example shows my problem. It also works correctly when I change "python" to "cmd".

PS: when I run python from the command prompt it gives me:

Python 3.7.9 (tags/v3.7.9:13c94747c7, Aug 17 2020, 18:58:18) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>>

That means that there should be stuff written to stdout.


Get this bounty!!!

#StackBounty: #html #python-3.x #request #grequests Extracting Text From Asynchronus Request Using grequests

Bounty: 50

I am trying to extract the text part from the request that I made through grequest library but I am unable to figure out how can I do so.

If we use Requests Library I would do

r = requests.get('www.google.com')
htmls.append(r.text)

Now if I am using grequests I can only get a list of response code and not text.

rs = (grequests.get(u) for u in urls)
result = grequests.map(rs)

What I’ve tried
result = grequests.map(rs.text)

I get an error using above piece of code AttributeError: 'generator' object has no attribute 'text'

My desired output is a list of html text where response code is 200 else the value should be None.
How can I achieve that?

Desired Output:

response_code = [<Response [200]>,<Response [404]>,<Response [200]>]
htmls = ['html1', None, 'html2']


Get this bounty!!!

#StackBounty: #python-3.x #ffmpeg #youtube #discord.py Discord.py – Streaming Youtube Live into Voice

Bounty: 50

I’m back on the stacks, was banned from asking questions because they were terrible apparently, but I need help on something I’ve done a lot of research on but cannot find my answer;

I need to be able to stream a youtube live stream’s audio into a VC for radio music using discord.py Rewrite. I’ve been looking at Youtube_DL- and/or FFMpeg-related internet posts and all were either outdated (discord.py==0.16.x) or involved downloading youtube videos (can’t do that with ongoing streams).

And I’ve also tried to look through the discord.py v0.16.12 voice_client.py create_ytdl_player. But to no avail, I could not find any solutions.

This is my current situation in my code. It is a background task in one of my cogs.

    @loop(seconds=5)
    async def check_voice(self):
        try:
            channel = await self.bot.fetch_channel(819346920082112532)
        except NotFound:
            print("[IR] `🎧Infinite Lofi` voice channel not found. Stopping check_voice loop.")
            self.check_voice.cancel()
            return

        try:
            infinite_lofi = await channel.connect(timeout=2, reconnect=True)
        except ClientException:
            return
        else:
            print("[IR] Connected to voice channel. 🚧 Loading from config livestream URL...")
            channel = await self.bot.fetch_channel(780849795572826213)
            message = await channel.fetch_message(780859270010503188)
            ctx = await self.bot.get_context(message)

            # infinite_lofi.play()  <<< Looking to play a youtube live stream audio.

According to the discord.py docs, I need an AudioSource to pass into VoiceClient.play().
My goal is to play LIVE audio, not download. In other words, stream.
If you need an example, I am looking to stream from this lofi radio.

Update

I’ve looked at a comment by Łukasz Kwieciński to check an example on the official Discord.py Repository. I only needed the "from_url" function under the class YTDLSource. However, my problem now is that it will play from a stream but will stop playing after 2-4 seconds of streaming, and then about 1-2 minutes later I get a statement in stdin saying "skipping 20 segments ahead, expired from playlists" in yellow and it plays, then stops again.

player = await YTDLSource.from_url("https://www.youtube.com/watch?v=5qap5aO4i9A", loop=self.bot.loop, stream=True)
infinite_lofi.play(player, after=lambda e: print(f"Player error: {e}")) 
# ^^^ plays for a few seconds and stops, repeats every 1-2 minutes.


Get this bounty!!!

#StackBounty: #python #python-3.x #multithreading #thread-safety #python-multithreading How to make a third party obj thread safe in py…

Bounty: 50

I want to use a third module in python like this :

import some_module

class MyService():
    
    def __init__(self):
        self.some_module_obj = some_module.some_obj()

    def run(self,some_parameter):
        self.some_module_obj.some_attritude(some_parameter)

I know that some_module.some_obj and its method some_attritude is not thread safe , My question is how to make MyService thread safe ?

update 1:

I see Artiom Kozyrev’s code , Is the code below right ?

import some_module
import threading
GLOBAL_LOCK = threading.Lock()

class Singleton(type):

    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            with GLOBAL_LOCK:
                if cls not in cls._instances:
                    cls._instances[cls] = super(Singleton, cls).__call__(
                        *args, **kwargs
                    )
        return cls._instances[cls]

class MyService(metaclass=Singleton):
    
    def __init__(self):
        self.some_module_obj = some_module.some_obj()
        self.rlock = threading.RLock()

    def run(self,some_parameter):
        result = None
        with self.rlock:
            result = self.some_module_obj.some_attritude(some_parameter)
        return result     


Get this bounty!!!

#StackBounty: #python #python-3.x #console #git git-user – Working on a shared local repository with multiple users made easier

Bounty: 50

I have recently been in a situation where multiple developers worked on a shared local git repository under the same Linux user¹. As one can imagine, it can easily become a bit annoying not to commit changes as someone else if you don’t check the values of git config user.name and git config user.email carefully and then change them accordingly. The same problem might also arise if you happen to work on several projects on your local machine side-by-side where you have to use different "identities", e.g. some work-related and private projects. I decided to tackle this with a little "git extension" that allows you to view and change the committer identity with less of a hassle.

Enter git user. To get an idea of its intended use, have a look at the output of git user -h:

git-user - Helps you to manage multiple users in a shared local repository

Subcommands
-----------
add, update, show, delete

Use git user <subcommand> --help to get more help

Examples
--------
# Add two test users
> git user add itsme "My Last" my.last@domain.tld --credential-username itsme
> git user add itsyou "Your Last" your.last@domain.tld

# activate the first user
> git user itsme
My Last <my.last@domain.tld>
# you can check this yourself with git config.name and git config user.email

# activate the second user
> git user itsyou
Your Last <your.last@domain.tld>

# use git user with no arguments to check which values are currently set
> git user
Your Last <your.last@domain.tld>

> git user show
    itsme: My Last <my.last@domain.tld> (push as 'itsme')
    itsyou: Your Last <your.last@domain.tld>
> git user delete itsyou --force
> git user show
    itsme: My Last <my.last@domain.tld>

The code that makes this possible is as follows:

#!/usr/bin/env python3
"""
... omitted for brevity, see help text in question ...
"""
import argparse
import json
import os
import sys
from collections import defaultdict
from subprocess import check_output, CalledProcessError, DEVNULL

DESCRIPTION = sys.modules[__name__].__doc__

__version__ = "0.2.0"


class UserbaseError(Exception):
    """Base class for userbase related exceptions"""


class UserDoesNotExist(UserbaseError):
    """Custom exception to indicate that a user does not exist"""


class UserDoesAlreadyExist(UserbaseError):
    """Custom exception to indicate that a user does already exist"""


class Userbase:
    """Abstraction of the underlying user storage file"""

    DATA_KEYS = ("name", "email", "credential_username")

    def __init__(self, users_file):
        self._users_file = users_file
        self._users = defaultdict(lambda: defaultdict(str))
        self._load()

    def _load(self):
        if not os.path.isfile(self._users_file):
            print("Creating default at '{}'.".format(self._users_file))
            os.makedirs(os.path.dirname(self._users_file))
            self.save()

        with open(self._users_file, "r") as json_file:
            self._users.update(json.load(json_file))

    def save(self):
        """Dump the current userbase to file"""
        with open(self._users_file, "w") as json_file:
            json.dump(self._users, json_file)

    def is_known(self, alias):
        """Check if an alias is part of the userbase"""
        return alias in self._users

    def get(self, alias):
        """Try to get user data for an alias

        Parameters
        ----------
        alias: str
            access the data stored under this alias

        Returns
        -------
        dict
            the data associated with the alias. Keys are listed in
            Userbase.DATA_KEYS

        Raises
        ------
        UserDoesNotExist
            If the user is not part of the userbase
        """
        if self.is_known(alias):
            return self._users[alias]

        raise UserDoesNotExist(
            "User with alias '{}' unknown".format(alias)
        )

    def as_dict(self):
        """Access the userbase as a dict"""
        return dict(self._users)

    def update(self, alias, **kwargs):
        """Update the data stored under an alias

        This function does not check whether the user exists or not. If it
        exists, its data will simply be overwritten.

        Parameters
        ----------
        alias: str
            update the data found under this alias
        kwargs: dict
            the script will look for the keys from Userbase.DATA_KEYS in the
            kwargs in order to update the internal database
        """
        for key in Userbase.DATA_KEYS:
            new_value = kwargs[key]
            if new_value is not None:
                self._users[alias][key] = new_value

    def delete(self, alias):
        """Delete a user from the userbase given its alias

        Parameters
        ----------
        alias: str
            the alias to look for

        Raises
        ------
        UserDoesNotExist
            you can probably guess when this is raised
        """
        try:
            del self._users[alias]
        except KeyError:
            raise UserDoesNotExist(
                "User with alias '{}' unknown".format(alias)
            )


try:
    _USERS_FILE = os.environ["GITUSER_CONFIG"]
except KeyError:
    _USERS_FILE = os.path.join(
        os.path.expanduser("~"), ".config", "git-user", "users.json"
    )
_USERS_FILE = os.path.abspath(_USERS_FILE)
_USERS = Userbase(_USERS_FILE)


def add(args):
    """Add a user to the userbase"""
    if _USERS.is_known(args.alias):
        raise UserDoesAlreadyExist(
            "User with alias '{}' already exist. ".format(args.alias)
            + "Delete first or use 'update'"
        )
    kwargs = {name: getattr(args, name, "") for name in Userbase.DATA_KEYS}
    _USERS.update(args.alias, **kwargs)


def update(args):
    """Interactive wrapper around Userbase.update"""
    if not _USERS.is_known(args.alias):
        raise UserDoesAlreadyExist(
            "User with alias '{}' does not exist. ".format(args.alias)
            + "Add first using 'add'"
        )
    kwargs = {name: getattr(args, name, "") for name in Userbase.DATA_KEYS}
    _USERS.update(args.alias, **kwargs)


def delete(args):
    """Interactivate wrapper around Userbase.delete"""
    if _USERS.is_known(args.alias) and not args.force:
        while True:
            answer = input(
                "Really delete user '{}'? [N/y] ".format(args.alias)
            )
            answer = answer.lower().strip()
            if answer in ("yes", "y"):
                break
            if answer in ("no", "n", ""):
                return
    _USERS.delete(args.alias)


def show(args):
    """Show the data of one or all the users in the userbase"""
    to_show = tuple(sorted(_USERS.as_dict().keys()))
    if args.alias is not None:
        to_show = (args.alias, )

    if to_show:
        for alias in to_show:
            cfg = _USERS.get(alias)
            msg = "    {}: {name} <{email}>".format(alias, **cfg)
            if "credential_username" in cfg.keys():
                msg += " (push as '{credential_username}')".format(**cfg)
            print(msg)
    else:
        print("No known aliases.")


def switch(args):
    """Switch to an other user from the userbase and/or show current config

    Set args.quiet to True to avoid seeing the current config as console output
    """
    if args.alias is not None:
        cfg = _USERS.get(args.alias)
        _git_config_name(cfg["name"])
        _git_config_email(cfg["email"])
        credential_username = cfg.get("credential_username", "")
        try:
            _git_config_credential_username(credential_username)
        except CalledProcessError as ex:
            if credential_username not in ("", None):
                raise ex
    if not args.quiet:
        _show_git_config()


def _show_git_config():
    try:
        git_name = _git_config_name().strip().decode("utf8")
        git_email = _git_config_email().strip().decode("utf8")
    except CalledProcessError:
        print(
            "Currently there is no (default) user for this repository.n"
            "Select one using git user <alias> or manually with git config"
        )
        return
    try:
        git_cred_username = _git_config_credential_username().strip().decode("utf8")
        print("{} <{}> (push as '{}')".format(git_name, git_email, git_cred_username))
        return
    except CalledProcessError:
        # git config has a non-zero exit status if no value is set
        pass

    print("{} <{}>".format(git_name, git_email))


def _git_config_name(name=None):
    args = ["git", "config", "user.name"]
    if name is not None:
        args.append(str(name))
    return check_output(args)


def _git_config_email(email=None):
    args = ["git", "config", "user.email"]
    if email is not None:
        args.append(str(email))
    return check_output(args)


def _git_config_credential_username(username=None):
    # remote_url = _git_get_remote_url(remote).strip().decode("utf8")
    args = ["git", "config"]
    if username == "":
        args.extend(["--remove-section", "credential"])
    else:
        args.append("credential.username")
        if username is not None:
            args.append(username)
    return check_output(args, stderr=DEVNULL)


def main():
    """CLI of the git user helper"""
    parser = argparse.ArgumentParser(
        description=DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter)

    # click might be an alternative here, but I want this to be as lightweight
    # as possible
    try:
        command = sys.argv[1]
    except IndexError:
        command = "switch"

    if command in ("add", "update", "delete", "show"):
        subparsers = parser.add_subparsers()

        add_subparser = subparsers.add_parser(
            "add", description="Add name and email for an alias")
        add_subparser.add_argument(
            "alias", help="Alias used to access this user's name and email")
        add_subparser.add_argument(
            "name", help="This value gets passed to git config user.name",
            default="")
        add_subparser.add_argument(
            "email", help="This value gets passed to git config user.email",
            default="")
        add_subparser.add_argument(
            "--credential-username",
            help="optional: push credential username",
            default="")
        add_subparser.set_defaults(command=add)

        update_subparser = subparsers.add_parser(
            "update", description="Update an alias")
        update_subparser.add_argument(
            "alias", help="Alias used to access this user's name and email")
        update_subparser.add_argument(
            "--name", help="This value gets passed to git config user.name",
            default=None)
        update_subparser.add_argument(
            "--email", help="This value gets passed to git config user.email",
            default=None)
        update_subparser.add_argument(
            "--credential-username",
            help="optional: push credential username",
            default=None)
        update_subparser.set_defaults(command=update)

        delete_subparser = subparsers.add_parser(
            "delete", description="Delete name and email stored for an alias")
        delete_subparser.add_argument(
            "alias", help="Delete name, email and possibly credentials for this alias")
        delete_subparser.add_argument(
            "--force", action="store_true", help="Delete without interaction"
        )
        delete_subparser.set_defaults(command=delete)

        show_subparser = subparsers.add_parser(
            "show",
            description="Show name and email associated with this alias")
        show_subparser.add_argument("alias", nargs="?", default=None)
        show_subparser.set_defaults(command=show)
    else:
        parser.add_argument(
            "alias",
            nargs="?",
            default=None,
            help="Use git config with name and email that belong to this alias")
        parser.add_argument(
            "--quiet", action="store_true",
            help="Suppress confirmation output after setting the config"
        )
        parser.add_argument(
            "--version", action="store_true",
            help="show git and git-user version and exit"
        )
        parser.set_defaults(command=switch)

    args = parser.parse_args()
    if getattr(args, "version", False):
        print("git-user version {}".format(__version__))
        return

    try:
        args.command(args)
    except (UserbaseError, CalledProcessError) as err:
        print(err)
        sys.exit(1)

    # only save on proper exit
    _USERS.save()


if __name__ == "__main__":
    main()

To test it, copy and paste the file somewhere in your $PATH where git can pick it up, name it git-user and make it executable. A symlink with this name is also possible if you prefer to have the file with a .py extension.

Notes to kind reviewers

  • Every kind of feedback is welcome. I’m also especially interested, how the script worked for you regarding usability. Was the help text actually, well, helpful?
  • As one of the comments tells, I knowingly ignored possibly useful packages like click in order to allow this to run on systems with no additional python packages.
  • Support for signature keys that would make it easier to sign your commits as well is on the TODO list, but not implemented yet.
  • The code was checked with pylint and pycodestyle, so it should be in a reasonable shape regarding code style.

¹ Whether or not this is a good idea might be arguable, but that’s not the point here.


Get this bounty!!!

#StackBounty: #python #python-3.x #git #type-safety Proper Python type-annotation for variable parameters signature of inherited method…

Bounty: 50

I have a CLI lib to automate batch operations on GitHub with many use cases: create issue, merge PR, delete branch… Since they all operate in a similar fashing regarding execution and output, I created a parent class GhUseCase as follows:

class GhUseCase:
    # ...other methods
    def action(self, github_repo: str, *args: Any, **kwargs: Any) -> None:
        """Execute some action in a repo."""
        raise NotImplementedError  # pragma: no cover

    def execute(
        self, *args: Any, **kwargs: Any
    ) -> Union[res.ResponseFailure, res.ResponseSuccess]:
        """Execute GitHubUseCase."""
        if self.github_repo:
            self.action(self.github_repo, *args, **kwargs)
        else:
            for github_repo in self.config_manager.config.github_selected_repos:
                self.action(github_repo, *args, **kwargs)
        return self.generate_response()

Basically, each use cases inherits this class and implements action method. Eg. overload 1 (delete branch):

import git_portfolio.use_cases.gh as gh

class GhDeleteBranchUseCase(gh.GhUseCase):
    """Github delete branch use case."""

    def action(self, github_repo: str, branch: str) -> None:  # type: ignore[override]
        """Delete branches."""
        github_service_method = "delete_branch_from_repo"
        self.call_github_service(github_service_method, github_repo, branch)

Eg. overload 2 (create issue) with another signature:

import git_portfolio.domain.issue as i
import git_portfolio.use_cases.gh as gh

class GhCreateIssueUseCase(gh.GhUseCase):
    """Github create issue use case."""

    def action(  # type: ignore[override]
        self, github_repo: str, issue: i.Issue
    ) -> None:
        """Create issues."""
        github_service_method = "create_issue_from_repo"
        self.call_github_service(github_service_method, github_repo, issue)

Full use-cases code can be found in this branch (files with names stating in gh).

The problem is that each use case’s action has a different list of parameters and I could not find a proper way to declare the type annotations without having type: ignore[override] errors on Mypy.

Any ideas of how to make this better and properly annotated?


Get this bounty!!!