#StackBounty: #python #python-3.x #multithreading #python-requests Python multi connection downloader resuming after pausing makes down…

Bounty: 100

I have written a Python script that downloads a single file using 32 connections if available.

I have written a multiconnection downloader that works fine without pausing, but won’t stop downloading after resuming, the progress would go beyond 100%…

Like this:

Download mode: Multi-thread (press Space to pause/resume, press Escape to stop)                                                                                                             
[████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████] 120% completed, paused: False                                    
Download mode: Multi-thread (press Space to pause/resume, press Escape to stop)                                                                                                             
1798.08 MiB downloaded, 1489.83 MiB total, -308.25 MiB remaining, download speed: 22.73 MiB/s                                                                                               
Minimum speed: 0.00 MiB/s, average speed: 4.54 MiB/s, maximum speed: 75.00 MiB/s                                                                                                            
Task started on 2021-08-09 16:57:03, 00:06:35 elapsed, ETA: -1:59:47

After progress exceeds 100%, there will be error messages like this:

Exception in thread Thread-78:
Traceback (most recent call last):
  File "C:Program FilesPython39libthreading.py", line 973, in _bootstrap_inner
    self.run()
  File "C:Program FilesPython39libthreading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "D:MyScriptdownloader.py", line 70, in multidown
    mm[position: position+len(chunk)] = chunk
IndexError: mmap slice assignment is wrong size

(The above doesn’t include all of the error message)

I have encountered all sorts of errors after resuming, but most importantly, the server will often send extra bytes from previous request, whose connection is dead and needless to say this breaks the whole code.

How should I implement pause and resume correctly?

I am thinking about multiprocessing, I assume the sessions and connections are all PID and port number related, and so far I haven’t encountered a new run of the script that received extra bytes from previous runs of the script, so I guess using another process with a new PID and new port number plus requests.session() plus {'connection': 'close'} for each download should guarantee that no extra bytes from previous connections will be received, I just don’t know how to share variables between processes…

The code:
downloader.py

import json
import keyboard
import os
import re
import requests
import sys
import time
import validators
from collections import deque
from datetime import datetime, timedelta
from math import inf
from mmap import mmap
from pathlib import Path
from ping3 import ping
from reprint import output
from threading import Thread


def timestring(sec):
    sec = int(sec)
    m, s = divmod(sec, 60)
    h, m = divmod(m, 60)
    return f'{h:02d}:{m:02d}:{s:02d}'


class Downloader:
    def __init__(self):
        self.recent = deque([0] * 12, maxlen=12)
        self.recentspeeds = deque([0] * 200, maxlen=200)
        self.paused = False
        self.progress = dict()

    class Multidown:
        def __init__(self, obj, id):
            self.count = 0
            self.position = 0
            self.completed = False
            self.id = id
            self.parent = obj
        
        def multidown(self, url, start, end):
            interrupted = False
            s = requests.session()
            s.headers.update({'connection': 'close', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0'})
            r = s.get(
                url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True)
            length = int(r.headers['content-length'])
            while end - length + (self.id != self.parent.progress['connections'] - 1) != start or r.status_code != 206:
                r.close()
                s.close()
                del r
                del s
                time.sleep(0.02)
                s = requests.session()
                r = s.get(
                    url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True)
                length = int(r.headers['content-length'])
            self.position = start
            
            for chunk in r.iter_content(1048576):
                if self.parent.paused:
                    self.parent.mm.flush()
                    r.connection.close()
                    r.close()
                    s.close()
                    del r
                    del s
                    interrupted = True
                    break
                if chunk:
                    self.parent.mm[self.position: self.position+len(chunk)] = chunk
                    self.count += len(chunk)
                    self.position += len(chunk)
                    self.parent.progress[self.id]['count'] = self.count
                    self.parent.progress[self.id]['position'] = self.position
            if not interrupted:
                r.close()
                s.close()
            if self.count == self.parent.progress[self.id]['length']:
                self.completed = True
                self.parent.progress[self.id]['completed'] = True
                self.parent.mm.flush()
    
    class Singledown:
        def __init__(self):
            self.count = 0
        def singledown(self, url, path):
            with requests.get(url, stream=True) as r:
                with path.open('wb') as file:
                    for chunk in r.iter_content(1048576):
                        if chunk:
                            self.count += len(chunk)
                            file.write(chunk)

    def download(self, url, filepath, num_connections=32, overwrite=False):
        singlethread = False
        threads = []
        bcontinue = False
        filepath = filepath.replace('\', '/')
        if (not re.match('^[a-zA-Z]:/(((?![<>:"/|?*]).)+((?<![ .])/)?)*$', filepath) or
                not Path(filepath[:3]).exists()):
            print('Invalid windows file path has been inputted, process will now stop.')
            return
        if not validators.url(url):
            print('Invalid url been inputted, process will now stop.')
            return
        if url.lower().startswith('ftp://'):
            print(
                "`requests` module doesn't suport File Transfer Protocol, process will now stop")
            return
        path = Path(filepath)
        if not path.exists():
            bcontinue = True
        else:
            if path.is_file():
                if overwrite:
                    bcontinue = True
                else:
                    while True:
                        answer = input(
                            f'`{filepath}` already exists, do you want to overwrite it? n(Yes, No):').lower()
                        if answer in ['y', 'yes', 'n', 'no']:
                            if answer.startswith('y'):
                                os.remove(filepath)
                                bcontinue = True
                            break
                        else:
                            print('Invalid input detected, retaking input.')
        if not bcontinue:
            print(
                f'Overwritting {filepath} has been aborted, process will now stop.')
            return
        bcontinue = False
        server = url.split('/')[2]
        ok = ping(server, timeout=2)
        if ok == False:
            print(
                'The server of the inputted url is non-existent, process will now stop.')
            return
        if ok:
            bcontinue = True
        if not ok:
            print('Connection has timed out, will reattempt to ping server 5 times.')
            for i in range(5):
                print(
                    f'Reattempting to ping server, retrying {i + 1} out of 5')
                ok = ping(server, timeout=2)
                if ok:
                    print(
                        f'Connection successful on retry {i + 1}, process will now continue.')
                    bcontinue = True
                    break
                else:
                    print(f'Retry {i + 1} out of 5 timed out' + (i != 4)
                          * ', reattempting in 1 second.' + (i == 4) * '.')
                    time.sleep(1)
        if not bcontinue:
            print('Failed to connect server, connection timed out, process will now stop')
            return
        bcontinue = False
        head = requests.head(url)
        if head.status_code == 200:
            bcontinue = True
        else:
            for i in range(5):
                print(f'Server responce is invalid, retrying {i + 1} out of 5')
                head = requests.head(url)
                if head.status_code == 200:
                    print(
                        f'Connection successful on retry {i + 1}, process will now continue.')
                    bcontinue = True
                    break
                else:
                    print(f'Retry {i + 1} out of 5 failed to access data' +
                          (i != 4) * ', reattempting in 1 second.' + (i == 4) * '.')
                    time.sleep(1)
        if not bcontinue:
            print("Can't establish a connection with access to data, can't download target file, process will now stop.")
            return
        folder = '/'.join(filepath.split('/')[:-1])
        Path(folder).mkdir(parents=True, exist_ok=True)
        headers = head.headers
        total = headers.get('content-length')
        if not total:
            print(
                f'Cannot find the total length of the content of {url}, the file will be downloaded using a single thread.')
            started = datetime.now()
            print('Task started on %s.' %
                    started.strftime('%Y-%m-%d %H:%M:%S'))
            sd = self.Singledown()
            th = Thread(target=sd.singledown, args=(url, path))
            threads.append(sd)
            th.start()
            total = inf
            singlethread = True
        else:
            total = int(total)
            if not headers.get('accept-ranges'):
                print(
                    'Server does not support the `range` parameter, the file will be downloaded using a single thread.')
                started = datetime.now()
                print('Task started on %s.' %
                        started.strftime('%Y-%m-%d %H:%M:%S'))
                sd = self.Singledown()
                th = Thread(target=sd.singledown, args=(url, path))
                threads.append(sd)
                th.start()
                singlethread = True
            else:
                segment = total / num_connections
                started = datetime.now()
                lastpressed = started
                path.touch()
                file = path.open('wb')
                file.seek(total - 1)
                file.write(b'')
                file.close()
                file = path.open(mode='r+b')
                self.mm = mmap(file.fileno(), 0)
                print('Task started on %s.' %
                        started.strftime('%Y-%m-%d %H:%M:%S'))
                self.progress['total'] = total
                self.progress['connections'] = num_connections
                for i in range(num_connections):
                    md = self.Multidown(self, i)
                    start = int(segment * i)
                    end = int(segment * (i + 1)) - (i != num_connections - 1)
                    length = end - start + (i != num_connections - 1)
                    th = Thread(target=md.multidown, args=(
                        url, start, end))
                    threads.append(md)
                    self.progress[i] = dict()
                    self.progress[i]['start'] = start
                    self.progress[i]['position'] = start
                    self.progress[i]['end'] = end
                    self.progress[i]['count'] = 0
                    self.progress[i]['length'] = length
                    self.progress[i]['completed'] = False
                    th.start()
                Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4))
        downloaded = 0
        totalMiB = total / 1048576
        speeds = []
        interval = 0.04
        with output(initial_len=5, interval=0) as dynamic_print:
            while True:
                Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4))
                status = sum([i.completed for i in threads])
                downloaded = sum(i.count for i in threads)
                self.recent.append(downloaded)
                done = int(100 * downloaded / total)
                doneMiB = downloaded / 1048576
                gt0 = len([i for i in self.recent if i])
                if not gt0:
                    speed = 0
                else:
                    recent = list(self.recent)[12 - gt0:]
                    if len(recent) == 1:
                        speed = recent[0] / 1048576 / interval
                    else:
                        diff = [b - a for a, b in zip(recent, recent[1:])]
                        speed = sum(diff) / len(diff) / 1048576 / interval
                speeds.append(speed)
                self.recentspeeds.append(speed)
                nzspeeds = [i for i in speeds if i]
                if nzspeeds:
                    minspeed = min(nzspeeds)
                else:
                    minspeed = 0
                maxspeed = max(speeds)
                now = datetime.now()
                elapsed = (now - started).total_seconds()
                meanspeed = downloaded / elapsed / 1048576
                remaining = totalMiB - doneMiB
                dynamic_print[0] = '[{0}{1}] {2}'.format(
                    'u2588' * done, 'u00b7' * (100-done), str(done)) + '% completed' + (not singlethread) * ', paused: {0}'.format(self.paused)
                dynamic_print[1] = 'Download mode: ' + singlethread * 
                    'Single-thread' + (not singlethread) * 'Multi-thread (press Space to pause/resume, press Escape to stop)'
                dynamic_print[2] = '{0:.2f} MiB downloaded, {1:.2f} MiB total, {2:.2f} MiB remaining, download speed: {3:.2f} MiB/s'.format(
                    doneMiB, totalMiB, remaining, speed)
                if speed and total != inf:
                    eta = timestring(remaining / speed)
                else:
                    eta = '99:59:59'
                dynamic_print[3] = 'Minimum speed: {0:.2f} MiB/s, average speed: {1:.2f} MiB/s, maximum speed: {2:.2f} MiB/s'.format(
                    minspeed, meanspeed, maxspeed)
                dynamic_print[4] = 'Task started on {0}, {1} elapsed, ETA: {2}'.format(
                    started.strftime('%Y-%m-%d %H:%M:%S'), timestring(elapsed), eta)
                if keyboard.is_pressed('space'):
                    if not singlethread:
                        pressed = datetime.now()
                        if (pressed - lastpressed).total_seconds() > 0.5:
                            lastpressed = pressed
                            if self.paused:
                                for i, md in enumerate(threads):
                                    if not md.completed:
                                        th = Thread(target=md.multidown, args=(
                                            url, self.progress[i]['position'], self.progress[i]['end']))
                                        th.start()
                            self.paused = not self.paused
                if keyboard.is_pressed('esc'):
                    if not singlethread:
                        ended = datetime.now()
                        self.paused = True
                        break
                if status == len(threads):
                    if not singlethread:
                        self.mm.close()
                    ended = datetime.now()
                    break
                time.sleep(interval)
        time_spent = (ended - started).total_seconds()
        meanspeed = total / time_spent / 1048576
        status = sum([i.completed for i in threads])
        if status == len(threads):
            print('Task completed on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format(
                ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed))
        else:
            print('Task interrupted on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format(
                ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed))

if __name__ == '__main__':
    d = Downloader()
    d.download(*sys.argv[1:])

For testing purposes this is a dumbed-down version of the script, with all checks removed while retaining the same functionality (sorry it really takes all these lines to show the download information):

import json
import os
import requests
import sys
import time
from collections import deque
from datetime import datetime, timedelta
from math import inf
from mmap import mmap
from pathlib import Path
from reprint import output
from threading import Thread


def timestring(sec):
    sec = int(sec)
    m, s = divmod(sec, 60)
    h, m = divmod(m, 60)
    return f'{h:02d}:{m:02d}:{s:02d}'


class Downloader:
    def __init__(self):
        self.recent = deque([0] * 12, maxlen=12)
        self.recentspeeds = deque([0] * 200, maxlen=200)
        self.paused = False
        self.progress = dict()
        self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0'

    class Multidown:
        def __init__(self, obj, id):
            self.count = 0
            self.position = 0
            self.completed = False
            self.id = id
            self.parent = obj
            self.UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0'
        
        def multidown(self, url, start, end):
            interrupted = False
            s = requests.session()
            s.headers.update({'connection': 'close', 'user-agent': self.UA})
            r = s.get(
                url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True)
            length = int(r.headers['content-length'])
            while end - length + (self.id != self.parent.progress['connections'] - 1) != start or r.status_code != 206:
                r.close()
                s.close()
                del r
                del s
                time.sleep(0.02)
                s = requests.session()
                r = s.get(
                    url, headers={'range': 'bytes={0}-{1}'.format(start, end)}, stream=True)
                length = int(r.headers['content-length'])
            self.position = start
            
            for chunk in r.iter_content(1048576):
                if self.parent.paused:
                    self.parent.mm.flush()
                    r.connection.close()
                    r.close()
                    s.close()
                    del r
                    del s
                    interrupted = True
                    break
                if chunk:
                    self.parent.mm[self.position: self.position+len(chunk)] = chunk
                    self.count += len(chunk)
                    self.position += len(chunk)
                    self.parent.progress[self.id]['count'] = self.count
                    self.parent.progress[self.id]['position'] = self.position
            if not interrupted:
                r.close()
                s.close()
            if self.count == self.parent.progress[self.id]['length']:
                self.completed = True
                self.parent.progress[self.id]['completed'] = True
                self.parent.mm.flush()
    

    def download(self, url, filepath, num_connections=32, overwrite=False):
        singlethread = False
        threads = []
        bcontinue = False
        filepath = filepath.replace('\', '/')
        if Path(filepath).exists():
            os.remove(filepath)
        folder = '/'.join(filepath.split('/')[:-1])
        Path(folder).mkdir(parents=True, exist_ok=True)
        head = requests.head(url, headers={'user-agent': self.UA})
        path = Path(filepath)
        headers = head.headers
        total = headers.get('content-length')
        if total:
            total = int(total)
            if headers.get('accept-ranges'):
                segment = total / num_connections
                started = datetime.now()
                lastpressed = started
                path.touch()
                file = path.open('wb')
                file.seek(total - 1)
                file.write(b'')
                file.close()
                file = path.open(mode='r+b')
                self.mm = mmap(file.fileno(), 0)
                print('Task started on %s.' %
                        started.strftime('%Y-%m-%d %H:%M:%S'))
                self.progress['total'] = total
                self.progress['connections'] = num_connections
                for i in range(num_connections):
                    md = self.Multidown(self, i)
                    start = int(segment * i)
                    end = int(segment * (i + 1)) - (i != num_connections - 1)
                    length = end - start + (i != num_connections - 1)
                    th = Thread(target=md.multidown, args=(
                        url, start, end))
                    threads.append(md)
                    self.progress[i] = dict()
                    self.progress[i]['start'] = start
                    self.progress[i]['position'] = start
                    self.progress[i]['end'] = end
                    self.progress[i]['count'] = 0
                    self.progress[i]['length'] = length
                    self.progress[i]['completed'] = False
                    th.start()
                Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4))
        downloaded = 0
        totalMiB = total / 1048576
        speeds = []
        interval = 0.04
        with output(initial_len=5, interval=0) as dynamic_print:
            while True:
                Path(filepath + '.progress.json').write_text(json.dumps(self.progress, indent=4))
                status = sum([i.completed for i in threads])
                downloaded = sum(i.count for i in threads)
                self.recent.append(downloaded)
                done = int(100 * downloaded / total)
                doneMiB = downloaded / 1048576
                gt0 = len([i for i in self.recent if i])
                if not gt0:
                    speed = 0
                else:
                    recent = list(self.recent)[12 - gt0:]
                    if len(recent) == 1:
                        speed = recent[0] / 1048576 / interval
                    else:
                        diff = [b - a for a, b in zip(recent, recent[1:])]
                        speed = sum(diff) / len(diff) / 1048576 / interval
                speeds.append(speed)
                self.recentspeeds.append(speed)
                nzspeeds = [i for i in speeds if i]
                if nzspeeds:
                    minspeed = min(nzspeeds)
                else:
                    minspeed = 0
                maxspeed = max(speeds)
                now = datetime.now()
                elapsed = (now - started).total_seconds()
                meanspeed = downloaded / elapsed / 1048576
                remaining = totalMiB - doneMiB
                dynamic_print[0] = '[{0}{1}] {2}'.format(
                    'u2588' * done, 'u00b7' * (100-done), str(done)) + '% completed' + (not singlethread) * ', paused: {0}'.format(self.paused)
                dynamic_print[1] = 'Download mode: ' + singlethread * 
                    'Single-thread' + (not singlethread) * 'Multi-thread (press Space to pause/resume, press Escape to stop)'
                dynamic_print[2] = '{0:.2f} MiB downloaded, {1:.2f} MiB total, {2:.2f} MiB remaining, download speed: {3:.2f} MiB/s'.format(
                    doneMiB, totalMiB, remaining, speed)
                if speed and total != inf:
                    eta = timestring(remaining / speed)
                else:
                    eta = '99:59:59'
                dynamic_print[3] = 'Minimum speed: {0:.2f} MiB/s, average speed: {1:.2f} MiB/s, maximum speed: {2:.2f} MiB/s'.format(
                    minspeed, meanspeed, maxspeed)
                dynamic_print[4] = 'Task started on {0}, {1} elapsed, ETA: {2}'.format(
                    started.strftime('%Y-%m-%d %H:%M:%S'), timestring(elapsed), eta)
                if PAUSE:
                    if not singlethread:
                        pressed = datetime.now()
                        if (pressed - lastpressed).total_seconds() > 0.5:
                            lastpressed = pressed
                            if self.paused:
                                for i, md in enumerate(threads):
                                    if not md.completed:
                                        th = Thread(target=md.multidown, args=(
                                            url, self.progress[i]['position'], self.progress[i]['end']))
                                        th.start()
                            self.paused = not self.paused
                if status == len(threads):
                    if not singlethread:
                        self.mm.close()
                    ended = datetime.now()
                    break
                time.sleep(interval)
        time_spent = (ended - started).total_seconds()
        meanspeed = total / time_spent / 1048576
        status = sum([i.completed for i in threads])
        if status == len(threads):
            print('Task completed on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format(
                ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed))
        else:
            print('Task interrupted on {0}, total time elapsed: {1}, average speed: {2:.2f} MiB/s'.format(
                ended.strftime('%Y-%m-%d %H:%M:%S'), timestring(time_spent), meanspeed))

if __name__ == '__main__':
    import hashlib
    global PAUSE
    PAUSE = False
    chash = '5674E59283D95EFE8C88770515A9BBC80CBB77CB67602389FD91DEF26D26AED2'
    d = Downloader()
    if sys.argv[1] == '0':
        d.download('http://ipv4.download.thinkbroadband.com/1GB.zip', 'C:/test/1GB.zip')
    elif sys.argv[1] == '1':
        th1 = Thread(target=d.download, args=('http://ipv4.download.thinkbroadband.com/1GB.zip', 'C:/test/1GB.zip'))
        th1.start()
        def test():
            while th1.is_alive():
                global PAUSE
                PAUSE = not PAUSE
                time.sleep(10)
        th2 = Thread(target=test)
        th2.start()
        while th1.is_alive():
            pass
    sha256_hash = hashlib.sha256()
    with open('C:/test/1GB.zip',"rb") as f:
        for byte_block in iter(lambda: f.read(1048576),b""):
            sha256_hash.update(byte_block)
    print(sha256_hash.hexdigest().lower() == chash.lower())

The url isn’t accessible without a VPN in my locale, and test 0 always results True, that is, if the connection hasn’t gone dead during the download, and test 1 sometimes results True, sometimes results False, sometimes it doesn’t finish(progress bar goes beyond 100%)…

How can my code be salvaged?


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.