#StackBounty: #python #performance #python-2.x #statistics #status-monitoring Application to monitor device power levels for deviations

Bounty: 100

I have a high performance, computation intensive application that runs on a Centos 7 machine with Python 2.7.5.

I’ll try to explain what the application does:

  1. The application runs an infinite loop, where it receives a message from an API call representing power levels of a device. The message is in Avro and encoded as a JSON string.
  2. Each of the devices gets a maximum of 8 separate power level readings (imagine each being a separate HW component within the device). 8 separate components constitute one device. There are a total of 50 of them. So (50 * 8) power level reports are possible.
  3. Each of this 8 HW devices produces a power report once per 30s.
  4. I have business logic to compute an arithmetic mean of the first 4 devices (component ids 1-4) and mean for the last 4 devices (ids 5-8).
  5. For a given device once I get all 8 readings received, I calculate the above mean and compare the difference between the mean of the group against the individual components, i.e. first 4 – mean_1, last 4 – mean_2.
    for id in 1 2 3 4: do ( mean_1 - pwr_reading(id) )
    for id in 5 6 7 8: do ( mean_2 - pwr_reading(id) )
    
  6. If the above difference is below a certain threshold, say thresh_first for first four and thresh_last for last four, I need to do an action.


So to model the above requirements, I’ve created a device class where I’m holding this information

obj_list = {}

class DevPwrInfo(object):
    """ The class provides an abstraction of all the processing we do at one
    device level
    """

    def __init__(self, code):
        """ The constructor spins up a new device object initializing the identifiers
        and the necessary data structures needed for the evaluation
        """
        self.code = code
        self.first4_pwr = {}
        self.last4_pwr = {}
        self.mean_val_first4 = ""
        self.mean_val_last4 = ""
        self.threshold_breach_list_first4 = []
        self.threshold_breach_list_last4 = []


    def reset_dev_info(self):
        """ Clear the data retained after finishing one round of report
        evaluation
        """
        self.first4_pwr = {}
        self.last4_pwr = {}
        self.mean_val_first4 = ""
        self.mean_val_last4 = ""

    def add_dev_pwr(self, id, pwr, pwr_valid_flag):
        if 1 <= int(id) <= 4:
            if pwr_valid_flag:
                self.first4_pwr[id] = pwr
            else:
                self.first4_pwr[id] = 0.0
        else:
            if pwr_valid_flag:
                self.last4_pwr[id] = pwr
            else:
                self.last4_pwr[id] = 0.0

        if len(self.first4_pwr) == 4:
            self.mean_val_first4 = self.compute_mean(first4_pwr)
            self.compare_thresh('first4')
            self.first4_pwr.clear()

        if len(self.last4_pwr) == 4:
            self.last4_pwr = self.compute_mean(last4_pwr)
            self.compare_thresh('last4')
            self.last4_pwr.clear()

    def compute_mean(self, pwr_list):
        return (float(sum(pwr_list)) / max(len(pwr_list), 1))

    def compare_thresh(self, type):
        low_thresh  = thresh_dict[self.code]

        if type == 'first4':
            pwr_dict = self.first4_pwr
            mean_val = self.mean_val_first4
        else:
            pwr_dict = self.last4_pwr
            mean_val = self.mean_val_last4

        for id, pwr in pwr_dict.iteritems():
            if int(math.floor(mean_val - ( pwr ))) < int(low_thresh):
                print("Will add more logic here")

def pwr_report_msg_decode(message):
    """ Handler for the raw idb message from the API
    """
    if message is not None:
        # This API is called for each message from the API call, so that
        # each device's object is called by the string identifier and
        # 'add_dev_pwr' function will ensure the lists are updated
        obj_list[message['code']].add_dev_pwr( message['id'],
                                               message['pwr'],
                                               message['valid_flag'])

# obj_dict is a dict of objects with key name as device name as value as the
# dict object
if __name__ == "__main__":
    # allowed_devices_list contains list of 44 device names
    allowed_devices_list = [ 'abc01', 'def01', 'xyz01' ]
    for device in allowed_devices_list:
        obj_list[device] = DevPwrInfo(device)

    while True:
         # An API producing message in the format
         msg = { "code": "abc01", "id": "3", "pwr": "-59.2", "valid_flag": "True'" }

So my question is how do I make each of the 44 objects run in parallel and not sequentially in one thread. I’ve looked about ThreadPoolExecutor but not sure how to make it computationally optimum?


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.