#StackBounty: #python #performance #python-2.x #statistics #status-monitoring Application to monitor device power levels for deviations

Bounty: 100

I have a high performance, computation intensive application that runs on a Centos 7 machine with Python 2.7.5.

I’ll try to explain what the application does:

1. The application runs an infinite loop, where it receives a message from an API call representing power levels of a device. The message is in Avro and encoded as a JSON string.
2. Each of the devices gets a maximum of 8 separate power level readings (imagine each being a separate HW component within the device). 8 separate components constitute one device. There are a total of 50 of them. So (50 * 8) power level reports are possible.
3. Each of this 8 HW devices produces a power report once per 30s.
4. I have business logic to compute an arithmetic mean of the first 4 devices (component ids 1-4) and mean for the last 4 devices (ids 5-8).
5. For a given device once I get all 8 readings received, I calculate the above mean and compare the difference between the mean of the group against the individual components, i.e. first 4 – mean_1, last 4 – mean_2.
for id in 1 2 3 4: do ( mean_1 - pwr_reading(id) )
for id in 5 6 7 8: do ( mean_2 - pwr_reading(id) )

6. If the above difference is below a certain threshold, say thresh_first for first four and thresh_last for last four, I need to do an action.

So to model the above requirements, I’ve created a device class where I’m holding this information

obj_list = {}

class DevPwrInfo(object):
""" The class provides an abstraction of all the processing we do at one
device level
"""

def __init__(self, code):
""" The constructor spins up a new device object initializing the identifiers
and the necessary data structures needed for the evaluation
"""
self.code = code
self.first4_pwr = {}
self.last4_pwr = {}
self.mean_val_first4 = ""
self.mean_val_last4 = ""
self.threshold_breach_list_first4 = []
self.threshold_breach_list_last4 = []

def reset_dev_info(self):
""" Clear the data retained after finishing one round of report
evaluation
"""
self.first4_pwr = {}
self.last4_pwr = {}
self.mean_val_first4 = ""
self.mean_val_last4 = ""

if 1 <= int(id) <= 4:
if pwr_valid_flag:
self.first4_pwr[id] = pwr
else:
self.first4_pwr[id] = 0.0
else:
if pwr_valid_flag:
self.last4_pwr[id] = pwr
else:
self.last4_pwr[id] = 0.0

if len(self.first4_pwr) == 4:
self.mean_val_first4 = self.compute_mean(first4_pwr)
self.compare_thresh('first4')
self.first4_pwr.clear()

if len(self.last4_pwr) == 4:
self.last4_pwr = self.compute_mean(last4_pwr)
self.compare_thresh('last4')
self.last4_pwr.clear()

def compute_mean(self, pwr_list):
return (float(sum(pwr_list)) / max(len(pwr_list), 1))

def compare_thresh(self, type):
low_thresh  = thresh_dict[self.code]

if type == 'first4':
pwr_dict = self.first4_pwr
mean_val = self.mean_val_first4
else:
pwr_dict = self.last4_pwr
mean_val = self.mean_val_last4

for id, pwr in pwr_dict.iteritems():
if int(math.floor(mean_val - ( pwr ))) < int(low_thresh):

def pwr_report_msg_decode(message):
""" Handler for the raw idb message from the API
"""
if message is not None:
# This API is called for each message from the API call, so that
# each device's object is called by the string identifier and
# 'add_dev_pwr' function will ensure the lists are updated
message['pwr'],
message['valid_flag'])

# obj_dict is a dict of objects with key name as device name as value as the
# dict object
if __name__ == "__main__":
# allowed_devices_list contains list of 44 device names
allowed_devices_list = [ 'abc01', 'def01', 'xyz01' ]
for device in allowed_devices_list:
obj_list[device] = DevPwrInfo(device)

while True:
# An API producing message in the format
msg = { "code": "abc01", "id": "3", "pwr": "-59.2", "valid_flag": "True'" }

So my question is how do I make each of the 44 objects run in parallel and not sequentially in one thread. I’ve looked about ThreadPoolExecutor but not sure how to make it computationally optimum?

Get this bounty!!!

This site uses Akismet to reduce spam. Learn how your comment data is processed.