#StackBounty: #python Discord Bot querying an External API

Bounty: 50

I’m posting here to ask for some help improving myself using python since I’m fairly new to it and come from C#.

The main functionality of this bot is parsing the Wargaming API for player related information.

Project structure:

project
|    main.py
└───ENV
│   
└───data
    |    classes.py
    |    dbcontext.py
    |    getStats.py

My Main:

import asyncio
import logging
import os
import sys
import re
import json

import discord
from discord.ext import commands

from data import dbcontext, log, secret, getStats as GetStats
from data.translations import en, de, pl ,tr
from data.classes import Config, Ship, Player, Stats, ReturnVal, ErrorType

sys.path.append(os.path.join(os.path.dirname(__file__), "data"))


"""
WoWs-Stats Bot.
Creator Fuyune

Uses Discord.py framework by Rapptz
"""

####### Bot Basic Configuration #######

bot = commands.Bot(command_prefix="!")
logger = logging.getLogger('discord')
logger.setLevel(logging.ERROR)
handler = logging.FileHandler(filename='err.log', encoding='utf-8', mode='a')
handler.setFormatter(logging.Formatter(
    '%(asctime)s:%(levelname)s:%(name)s: %(message)s'))
logger.addHandler(handler)
bot.remove_command('help')
configs = []
regions = ["eu","ru","na","asia"]
langs = ["de","en","pl","tr"] #remember adding to imports too
amounts = {}

####### Bot Events #######

@bot.event
async def on_ready():
    log.writeLog("init", "Ready")
    print("Bot Started")
    configs = []
    for x in bot.guilds:
        configs.append(dbcontext.getConfig(x.id))
        print("Connected to: " + x.name)
        log.writeLog("Connected", x.name)


@bot.command()
async def stats(ctx, *args):
    if ctx.message.guild is None:
        await ctx.send("**I'm not allowed to answer in Private messages.**")
    else:
        config = dbcontext.getConfig(ctx.guild.id)
        if (len(args) == 0) or (len(args) == 1 and args[0].lower() == "help") :
            await writeHelp(ctx,config)
        elif len(args) == 1 and args[0] != "help":
            playerObject = GetStats.getPlayer(config,args[0])
            if playerObject == None or playerObject.id == 0 :
                await  writeError(ctx,config,ErrorType.UNKNOWN_PLAYER)
            else:
                playerStats = GetStats.getPlayerStats(config,playerObject)
                if playerStats.hidden:
                    await writeError(ctx,config,ErrorType.HIDDEN_STATS)
                elif playerStats.damage == 0:
                    await writeError(ctx,config,ErrorType.UNKNOWN_STATS)
                else:
                    await writeAnswer(ctx,config,playerObject,playerStats)
        elif len(args) >= 2:
            playerObject = GetStats.getPlayer(config,args[0])
            if playerObject == None or playerObject.id == 0:
                await writeError(ctx,config,ErrorType.UNKNOWN_PLAYER)
            else:
                ship = dbcontext.getShip(" ".join(args[1:]))
                if ship.id == 0:
                    await writeError(ctx,config,ErrorType.UNKNOWN_SHIP)
                else:
                    shipStats = GetStats.getShipStats(config,playerObject,ship)
                    if shipStats.hidden:
                        await writeError(ctx,config,ErrorType.HIDDEN_STATS)
                    elif shipStats.damage == 0:
                        await writeError(ctx,config,ErrorType.UNKNOWN_STATS)
                    else:
                        await writeAnswer(ctx,config,playerObject,ship,shipStats)


@bot.command()
async def statsr(ctx, *args):
    if ctx.message.guild is None:
        await ctx.send("**I'm not allowed to answer in Private messages.**")
    else:
        config = dbcontext.getConfig(ctx.guild.id)
        regex = re.compile("^(![1-9][1]|![1-9])|(!s[1-4])$", re.IGNORECASE) # regex to check for valid season
        if (len(args) == 0) or (len(args) == 1 and args[0].lower() == "help") :
            await writeHelp(ctx,config)
        elif regex.match(args[0]):
            await writeError(ctx,config,ErrorType.UNKNOWN_SEASON)
        elif len(args) == 2 and args[0] != "help":
            season = convertSeason(args[0])
            playerObject = GetStats.getPlayer(config,args[1])
            if playerObject == None or playerObject.id == 0 :
                await  writeError(ctx,config,ErrorType.UNKNOWN_PLAYER)
            else:
                playerStats = GetStats.getRankedStats(config,playerObject,season)
                if playerStats.hidden:
                    await writeError(ctx,config,ErrorType.HIDDEN_STATS)
                elif playerStats.damage == 0:
                    await writeError(ctx,config,ErrorType.UNKNOWN_STATS)
                else:
                    await writeAnswer(ctx,config,playerObject,playerStats)
        elif len(args) > 2:
            season = convertSeason(args[0])
            playerObject = GetStats.getPlayer(config,args[1])
            if playerObject == None or playerObject.id == 0:
                await writeError(ctx,config,ErrorType.UNKNOWN_PLAYER)
            else:
                ship = dbcontext.getShip(" ".join(args[2:]))
                if ship.id == 0:
                    await writeError(ctx,config,ErrorType.UNKNOWN_SHIP)
                else:
                    shipStats = GetStats.getRankedStats(config,playerObject,season,ship)
                    if shipStats.hidden:
                        await writeError(ctx,config,ErrorType.HIDDEN_STATS)
                    elif shipStats.damage == 0:
                        await writeError(ctx,config,ErrorType.UNKNOWN_STATS)
                    else:
                        await writeAnswer(ctx,config,playerObject,ship,shipStats)
        else:
            await writeHelp(ctx,config)

####### Helper Functions #######

async def writeAnswer(ctx,config,*args):
    player = None
    ship = None
    stats = None
    embed = None
    translation = globals()[config.language]
    for arg in args:        
        if isinstance(arg,Player):
            player=arg
        elif isinstance(arg,Ship):
            ship=arg
        elif isinstance(arg,Stats):
            stats=arg
    if player != None and stats != None and ship != None:
        color = getColor(stats.avgWins)
        embed=discord.Embed(title=translation.title.format(username=player.name),url=GetStats.getPlayerLink(config,player), description=translation.description.format(shipname=ship.name), color=color)
        embed.set_author(name="WoWs-Stats-Bot",url="https://github.com/De-Wohli")
        embed.set_thumbnail(url=ship.url)
        embed.add_field(name=translation.battles, value=stats.battles, inline=True)
        embed.add_field(name=translation.avgDamage, value=stats.avgDamage, inline=True)
        embed.add_field(name=translation.winrate, value="{:.2f}%".format(stats.avgWins), inline=True)
        embed.set_footer(text=translation.footer)
    elif player != None and stats != None and ship == None:
        color = getColor(stats.avgWins)
        embed=discord.Embed(title=translation.title.format(username=player.name),url=GetStats.getPlayerLink(config,player), description=translation.general, color=color)
        embed.set_author(name="WoWs-Stats-Bot",url="https://github.com/De-Wohli")
        embed.add_field(name=translation.battles, value=stats.battles, inline=True)
        embed.add_field(name=translation.avgDamage, value=stats.avgDamage, inline=True)
        embed.add_field(name=translation.winrate, value="{:.2f}%".format(stats.avgWins), inline=True)
        embed.set_footer(text=translation.footer)
    if embed != None:
        await ctx.send(embed=embed)


async def writeHelp(ctx,config):
    color = discord.Color.teal()
    translation = globals()[config.language]
    embed=discord.Embed(title=translation.helpHeader, description=translation.helpDescription, color=color)
    embed.set_author(name="WoWs-Stats-Bot")
    embed.add_field(name="!stats [player]",value=translation.helpPlayer,inline=False)
    embed.add_field(name="!stats [player] [shipname]",value=translation.helpShip,inline=False)
    embed.add_field(name="!statsr [season] [player]",value=translation.helpRanked,inline=False)
    embed.add_field(name="!statsr [season] [player] [shipname]",value=translation.helpSRanked,inline=False)
    embed.set_footer(text="This bot was made by Fuyu_Kitsune")
    await ctx.send(embed=embed)


async def writeError(ctx,config,errorType):
    color = discord.Color.dark_teal()
    translation = globals()[config.language]
    errorText = translation.error[errorType.value]
    embed=discord.Embed(title="Error", description=errorText, color=color)
    embed.set_author(name="WoWs-Stats-Bot")
    embed.set_footer(text=translation.footer)
    await ctx.send(embed=embed)


def getColor(value):
    if value <= 40:
        return discord.Colour.red()
    elif value > 40 and value <= 45:
        return discord.Colour.orange()
    elif value > 45 and value <= 50:
        return discord.Colour.gold()
    elif value > 50 and value <= 53:
        return discord.Colour.green()
    elif value > 53 and value <= 56:
        return discord.Color.dark_green()
    elif value > 56 and value <= 60:
        return discord.Color.teal()
    elif value > 60 and value <= 66:
        return discord.Color.purple()
    elif value > 66:
        return discord.Colour.dark_purple()


def convertSeason(value):
    season = value
    if season == "s1":
        season = "101"
    elif season == "s2":
        season = "102"
    elif season == "s3":
        season = "103"
    elif season == "s4":
        season = "104"
    return season

This being the main method, here is where the parsing of the command takes place. The usual command is !stats [playername] [shipname] My primary concerns here are the async def stats(ctx, *args): and async def statsr(ctx, *args): functions.

data/classes.py:

from enum import Enum


class Ship:
    def __init__(self, id=0, name="", url=""):
        self.id = id
        self.name = name
        self.url = url

    def __eq__(self, other):
        return self.name == other


class Player:
    def __init__(self, id=0, name="", code="404"):
        self.id = id
        self.name = name
        self.code = code


class Stats:
    def __init__(self, battles=0, frags=0, damage_dealt=0, wins=0, hidden=False, code=404):
        self.hidden = hidden
        self.battles = battles
        self.frags = float(frags)
        self.damage = float(damage_dealt)
        self.wins = wins
        self.code = code

    @property
    def avgFrags(self):
        if self.battles == 0:
            return 0
        return round(self.frags / self.battles, 2)

    @property
    def avgDamage(self):
        if self.battles == 0:
            return 0
        return round(self.damage / self.battles, 2)

    @property
    def avgWins(self):
        if self.battles == 0:
            return 0
        return round(float(self.wins / self.battles), 4)*100


class Config:
    def __init__(self, serverId=0, region="eu", language="en"):
        self.serverId = serverId
        self.region = region
        self.language = language


class ReturnVal(Enum):
    SUCCESS = 0
    FAILED = 1
    DOUBLE = 2

class ErrorType(Enum):
    def __str__(self):
        return str(self.value)

    UNKNOWN_PLAYER = 0
    UNKNOWN_SHIP = 1
    UNKNOWN_STATS = 2
    HIDDEN_STATS = 3
    UNKNOWN_SEASON = 4

these are my model classes, since I’m coming from C# I’m not too sure if this would be an acceptable way of dealing with it in python.

data/dbcontext.py:

import sys
import os
import mysql.connector

import data.log as log
from data.classes import Config, ReturnVal, Ship
from data.secret import Secret


def connect():
    mydb = mysql.connector.connect(host=Secret.dbAddr,user=Secret.dbUser,passwd=Secret.dbPwd,database=Secret.dbName)
    return mydb


def getShip(name):
    try:
        con=connect()
        cursor = con.cursor()
        sql = 'SELECT id,Name,url FROM Ships WHERE name LIKE %s'
        val = (name,)
        cursor.execute(sql,val)
        rows = cursor.fetchone()
        if rows is None:
            sql =  'SELECT id,Name,url FROM Ships WHERE id = (SELECT id FROM Asn WHERE name LIKE %s)'
            val = (name,)
            cursor.execute(sql,val)
            rows = cursor.fetchone()
            if rows is None:
                return Ship()
            else:
                return Ship(id=rows[0], name=rows[1], url=rows[2])
        else:
            return Ship(id=rows[0], name=rows[1], url=rows[2])
    except Exception as e:
        log.writeLog("getShip", str(e))
        con.rollback()
        return Ship()
    finally:
        con.commit()
        con.close()



def addAsn(name, asn):
    try:
        con=connect()
        cursor = con.cursor()
        sql = 'SELECT id,Name,url FROM Ships WHERE name LIKE %s'
        val = (name,)
        cursor.execute(sql,val)
        rows = cursor.fetchone()
        if rows is None:
            return ReturnVal.FAILED
        else:
            sql = 'INSERT INTO Asn (name,id,url) VALUES(%s,%s,%s)'
            val = (asn,rows[0],rows[2])
            cursor.execute(sql,val)
            return ReturnVal.SUCCESS
    except mysql.connector.IntegrityError as e:
        return ReturnVal.DOUBLE
    except Exception as e:
        log.writeLog("getShip", str(e))
        con.rollback()
        return ReturnVal.FAILED
    finally:
        con.commit()
        con.close()

def getConfig(id):
    try:
        con = connect()
        cursor = con.cursor()
        sql = 'SELECT region,language FROM Config WHERE ServerId = %s'
        val = (id,)
        cursor.execute(sql,val)
        rows = cursor.fetchone()
        if rows is None:
            config = Config(serverId=id)
            sql = 'INSERT INTO Config(ServerID,region,language) VALUES(%s,%s,%s)'
            val = (config.serverId, config.region, config.language)
            cursor.execute(sql,val)
            return config
        else:
            return Config(serverId=id, region=rows[0], language=rows[1])
    except Exception as e:
        log.writeLog("getConfig", str(e))
        con.rollback()
    finally:
        con.commit()
        con.close()

def addConfig(id):
    try:
        con=connect()
        cursor = con.cursor()
        sql = 'INSERT INTO Config (ServerId, region, language) VALUES(%s,%s,%s)'
        val = (id,"eu","en")
        cursor.execute(sql,val)
        return ReturnVal.SUCCESS
    except Exception as e:
        log.writeLog("addConfig", str(e))
        con.rollback()
        return ReturnVal.FAILED
    finally:
        con.commit()
        con.close()

def updateConfig(config):
    try:
        con=connect()
        cursor = con.cursor()
        sql = 'UPDATE Config SET region = %s, language = %s WHERE ServerID = %s'
        val = (config.region, config.language, config.serverId)
        cursor.execute(sql,val)
        return ReturnVal.SUCCESS
    except Exception as e:
        log.writeLog("updateConfig", str(e))
        con.rollback()
        return ReturnVal.FAILED
    finally:
        con.commit()
        con.close()

In this file I’m handling all database access. The database stores mainly Ship names & IDs. Is this way of handling the database connection secure or is this vulnerable?

data/getStats.py:

import os
import sys
import requests
import json

sys.path.append(os.path.join(os.path.dirname(__file__), "lib"))
sys.path.append(os.path.join(os.path.dirname(__file__),"../data"))
from data.classes import Ship, Player, Stats
from data.secret import Secret
from data.api import api
from data import log


def getPlayer(config,playerName):
    try:
        url = api.psearch.format(reg=config.region,wgapi=Secret.api,playerName=playerName)
        response = requests.get(url)
        statuscode = response.status_code
        response = response.json()
        if response["status"] == "ok":
            if response["meta"]["count"] == 0:
                return Player(code=200)
            else:
                nick = response["data"][0]["nickname"]
                pid = response["data"][0]["account_id"]
                newPlayer = Player(name = nick, id = pid,code = statuscode)
                return newPlayer
        else:
            return Player(code=statuscode)
    except Exception as e:
        print(str(e))
        log.writeLog("getPlayer",str(e))
        return Player(code=200)


def getPlayerLink(config,player):
    link = str.format("{}{}-{}",str(api.plink).format(reg=config.region),player.id,player.name)
    return link


def getPlayerStats(config,player):
    try:
        url = api.pstats.format(reg=config.region,wgapi=Secret.api,accountID=player.id)
        response = requests.get(url)
        statuscode = response.status_code
        response = response.json()
        if(response["status"] == "ok"):
            if bool(response["data"]):
                if bool(response["meta"]["hidden"]):
                    st = Stats(1,1,1,1,True,200)
                else:
                    battles = response["data"][str(player.id)]["statistics"]['pvp']['battles']
                    wins = response["data"][str(player.id)]["statistics"]['pvp']['wins']
                    frags = response["data"][str(player.id)]["statistics"]['pvp']['frags']
                    damage_dealt= response["data"][str(player.id)]["statistics"]['pvp']['damage_dealt']
                    st = Stats(battles,frags,damage_dealt,wins,False,statuscode)
            else:
                return Stats(code=200)
            return st
        else:
            return Stats(code=statuscode)
    except Exception as e:
        print(str(e))
        log.writeLog("getPlayerStats",str(e))
        return Stats(code=200)


def getShipStats(config,player,ship):
    try:
        url = api.sstats.format(reg=config.region,wgapi=Secret.api,accountID=player.id,shipID=ship.id)
        response = requests.get(url)
        statuscode = response.status_code
        response = response.json()
        if(response["status"] == "ok"):
            if bool(response["meta"]["hidden"]):
                st = Stats(1,1,1,1,True,200)
            elif bool(response["data"]) and not response["data"][str(player.id)] == None :
                battles = response["data"][str(player.id)][0]['pvp']['battles']
                wins = response["data"][str(player.id)][0]['pvp']['wins']
                frags = response["data"][str(player.id)][0]['pvp']['frags']
                damage_dealt= response["data"][str(player.id)][0]['pvp']['damage_dealt']
                st = Stats(battles,frags,damage_dealt,wins,False,statuscode)
            else:
                return Stats(code=200)
            return st
        else:
            return Stats(code=statuscode)
    except Exception as e:
        print(str(e))
        log.writeLog("getShipStats",str(e))
        return Stats(code=200)


def getRankedStats(config,player,season,ship = None):
    try:
        if ship is not None:
            url = api.rsstats.format(reg=config.region,wgapi=Secret.api,accountID=player.id,shipID = ship.id)
        else:
            url = api.rpstats.format(reg=config.region,wgapi=Secret.api,accountID=player.id)
        response = requests.get(url)
        statuscode = response.status_code
        response = response.json()
        stats = Stats()
        if(response["status"] == "ok"):
            if bool(response["meta"]["hidden"]):
                stats = Stats(1,1,1,1,True,200)
            elif bool(response["data"]) and not response["data"][str(player.id)] == None :
                if ship is not None:
                    seasons = response["data"][str(player.id)][0]["seasons"]
                    if season in seasons:
                        currentSeason = response["data"][str(player.id)][0]["seasons"][season]
                    else:
                        return Stats(code=200)
                else:
                    seasons = response["data"][str(player.id)]["seasons"]
                    if season in seasons:
                        currentSeason = response["data"][str(player.id)]["seasons"][season]
                    else:
                        return Stats(code=200)
                r = []
                r.append(currentSeason["rank_solo"])
                r.append(currentSeason["rank_div2"])
                r.append(currentSeason["rank_div3"])
                for x in r:
                    if x is not None:
                        stats.wins += x["wins"]
                        stats.damage += x["damage_dealt"]
                        stats.battles += x["battles"]
                        stats.frags += x["frags"]
                stats.code = 200
            else:
                return Stats(code=200)
            return stats
        else:
            return Stats(code=statuscode)
        pass
    except Exception as e:
        log.writeLog("getRankedStats",str(e))
        pass

this is the communication class for querying the API endpoints. The endpoints are stored in a different file data/api.py containing a class with the variables stored. i.e.

class api:
    psearch = "https://api.worldofwarships.{reg}/wows/account/list/?application_id={wgapi}&search={playerName}"

The code as shown here is currently working and being hosted on a linux server without too many troubles yet but I’m curious to know about the, probably many things, i could improve.
If needed I can provide the answers I’m getting from the API if that would be of any concern to the code I’ve posted.

In general I’m seeking assistance to improve my python programming aswell as improve performance / stability of my code. Any information aswell as critique is welcome.

Thank you.


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.