## Background

For a future workshop I’ll have to fit arbitrary functions (independent variable is height `z`

) to data from multiple sources (output of different numerical weather prediction models) in a yet unknown format (but basically gridded height/value pairs). The functions only have to interpolate the data and be differentiable. There should explicitly be no theoretical background for the type of function, but they should be smooth. The goal is to use the gridded (meaning discrete) output of the numerical weather prediction model in our pollutant dispersion model, which requires continuous functions.

## Workflow

- choose the input model
- load input data
- define list of variables (not necessarily always the same)
- define height ranges (for the piecewise function)
- define base functions like “a0 + a1*z” for each height range and variable
- optionally define weights, because some parts are more important that others
- fit the piecewise functions
- save the fitted functions and their derivatives as Fortran 90 free form source code (to be included in our model)

I don’t think 1.-6. can be automated, but the rest should be.

## Code

```
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit
from sympy import log, ln, Piecewise, lambdify, symbols, sympify, fcode, sqrt
def config(name):
"""Configuration of the piecewise function fitting, dependent on input name
Input:
name... name of experiment to fit data to, basically chooses settings
Output:
var_list... list of variables to fit
infunc_list_dict... dictionary with var_list as keys, each having a list as
value that contains strings with the sub-function to fit, from
the bottom up. Only the first (lowest) may have a constant value, all
others must be 0 at the height they "take over" (where their argument
is 0). There, the value of the lower, fitted function is added to
ensure continuity. The parameters for each function HAVE to be of the
pattern "aX", where "X" is numerically increasing (0, 1, 2...) within
each sub-function.
The arguments of aloft functions (not the bottom most) are usually
"z - t", unless there is some trickery with "s"
A constant, first sub-function is 'a0', while constant sub-function
aloft has to be '0' for technical reasons.
Variables replaced by values:
- t... current threshold height
- s... transition value at height t
- zi.. bounday layer height
thresh_list_dict... dictionary with var_list as keys, each having a list as
value that contains the height where the piecewise functions change.
for technical reasons the ground (0) and the top (np.inf) are also
included.
weight_list_dict... dictionary with var_list as keys, each having a list as
value that contains relative weights (to 1) that are used to force the
fitting to be closer to the real value at crucial points. This is
around the threshold heights, at the ground and at the ABL. To "turn
off" a weight, set it to 1. The first weight is at the ground and then
there are two around each treshold height and the last at the top.
i.e: [ground,
lower-of-thresh0, upper-of-thresh0,
lower-of-thresh1, upper-of-thresh1,
...
top]
the first function uses ground and lower-of-thresh0,
the second uses upper-of-thresh0 and lower-of-thresh1 until
the last uses lower-of-threshI and top
wefact_list_dict... analog to weight_list_dict, except that it contains
the relative distance where the weight in weight_list_dict is applied.
Relative distance means here: fraction of the total subrange. Typical
values are 0.1 or 0.2, meaning 10 or 20% of the total subrange take the
accompanying weight. If the corresponding weight equals 1, the value
has no influence.
teston... True: create plots; False: don't
saveon... True: don't show plots, save them as pdfs (only if teston==True).
printon... True: print output to console; False: don't
"""
teston = True
saveon = False
printon = False
# ========= TMP220 =========
if name == 'tmp220':
abl_height = 990
var_list = ['um', 'u2', 'v2', 'w2', 'w3', 'uw', 'eps']
infunc_list_dict = {
'um': ['a0*ln(z-t)**3 + a1*ln(z-t)**2 + a2*ln(z-t) + a3'],
'u2': ['a0 + a1*(z-t) + a2*(z-t)**2 + a3*(z-t)**3 + a4*(z-t)**4 + a5*(z-t)**5',
'a0*(z-t) + a1*(z-t)**2'],
'v2': ['a0 + a1*(z-t) + a2*(z-t)**2 + a3*(z-t)**3 + a4*(z-t)**4 + a5*(z-t)**5',
'a0*(z-t) + a1*(z-t)**2'],
'w2': ['a0 + a1*(z-t) + a2*(z-t)**2 + a3*(z-t)**3 + a4*(z-t)**4 + a5*(z-t)**5',
'a0*(z-t) + a1*(z-t)**2'],
'w3': ['a0',
'0'],
'uw': ['a0 + a1*(z-t) + a2*(z-t)**2 + a3*(z-t)**3 + a4*(z-t)**4 + a5*(z-t)**5',
'a0*(z-t) + a1*(z-t)**2 + a2*(z-t)**3 + a3*(z-t)**4'],
'eps': ['a0 + a1*(z-t) + a2*(z-t)**2 + a3*(z-t)**3 + a4*(z-t)**4 + a5*(z-t)**5',
'a0*(z-t)**a1 + a2*(z-t)**3 + a3*(z-t)**2 + a4*(z-t)**4 + a5*(z-t)**6'],
}
thresh_list_dict = {
'um': [0.0, np.inf],
'u2': [0.0, 12.5, np.inf],
'v2': [0.0, 12.5, np.inf],
'w2': [0.0, 12.5, np.inf],
'w3': [0.0, 12.5, np.inf],
'uw': [0.0, 12.5, np.inf],
'eps': [0.0, 12.5, np.inf],
}
weight_list_dict = {
'um': [100, 1],
'u2': [100, 5000, 1, 1],
'v2': [100, 5000, 1, 1],
'w2': [100, 5000, 1, 1],
'w3': [100, 5000, 1, 1],
'uw': [100, 5000, 1, 1],
'eps': [100, 5000, 1, 1],
}
wefact_list_dict = {
'um': [0.2, 0.1],
'u2': [0.2, 0.2, 0.1, 0.1],
'v2': [0.2, 0.2, 0.1, 0.1],
'w2': [0.2, 0.2, 0.1, 0.1],
'w3': [0.2, 0.2, 0.1, 0.1],
'uw': [0.2, 0.2, 0.1, 0.1],
'eps': [0.2, 0.2, 0.1, 0.1],
}
#elif name == 'SOMETHING ELSE': analog to above, omitted for brevity
else:
raise ValueError('Unsupported name, configure in config()')
return (var_list, abl_height, infunc_list_dict, thresh_list_dict,
weight_list_dict, wefact_list_dict, teston, saveon, printon)
def read_scm_data(name_str):
"""This routines reads in the profiles from the SCMs
Input: # TODO (depends on their format), for now dummy data
Output: dataframe: z, u2, v2, w2, w3, uw, um, eps
"""
# TODO: add actual read routine, this is just dummy input
if name_str == 'tmp220':
out = pd.read_csv('tmp220.csv', delimiter=',')
#elif name_str == 'SOMETHING ELSE': as above, omitted for brevity
else:
raise ValueError('Unknown name, configure in read_scm_data()')
return out
def test_fit(name, var_list, func_dict, data, saveon):
"""plot of data vs fitted functions
"""
# Omitted for brevity, not that relevant
def fit_func(var, abl_height, data_z, data_v, infunc_str_list,
thresh_list, weight_list, wefact_list):
"""Converts the piecewise defined functions in infunc_str_list with the
thresholds in thresh_list (and the weights defined by weight_list and
wefact_list) to a SymPy expression and fits it to (data_z, data_v), where
data_z is height and data_v are the values in each height. Returns the
piecewise SymPy function with substituded parameters.
"""
z = symbols('z')
y_list = [] # holds the subfunctions
niterations = 20000
# transition_value holds the value that is added to each sub-function
# to ensure a continuous function. this is obviously 0 for the first
# subfunction and equal to the value of the previous sub-function at the
# threshold height for each subsequent sub-function.
transition_value = 0
# for each piece of the function:
for i, func_str in enumerate(infunc_str_list):
# find number of parameters and create those SymPy objects
nparams = func_str.count('a')
a = symbols('a0:%d' % nparams)
t = symbols('t') # transition height
s = symbols('s') # transition value
zi = symbols('zi') # boundary layer height
# check the string and create the sympy expression
verify_func_str(var, func_str)
y_list.append(sympify(func_str))
# add the transition value and substitute the placeholder variables:
y_list[i] += transition_value
y_list[i] = y_list[i].subs(t, thresh_list[i])
y_list[i] = y_list[i].subs(s, transition_value)
y_list[i] = y_list[i].subs(zi, abl_height)
# lambdify the sympy-expression with a somewhat ugly hack:
t = [z]
for j in range(nparams):
t.append(a[j])
func = lambdify(tuple(t), y_list[i], modules=np)
# create the correction subset of the data
local_index = data_z > thresh_list[i] & data_z < thresh_list[i + 1]
local_z = data_z[local_index]
local_v = data_v[local_index]
# create the weight arrays. they have the same size as the local_z and
# are 1 everywhere except the range defined with wefact, where they
# are the specified weight. see config() for definitions.
weight = np.ones_like(local_z)
z_range = local_z[-1] - local_z[0]
lower_weight_lim = local_z[0] + wefact_list[2*i] * z_range
upper_weight_lim = local_z[-1] - wefact_list[2*i + 1] * z_range
weight[local_z < lower_weight_lim] = weight_list[2*i]
weight[local_z > upper_weight_lim] = weight_list[2*i + 1]
sigma = 1. / weight
# fit the function to the data, checking for constant function aloft:
if nparams > 0:
popt, pcov = curve_fit(func, local_z, local_v, sigma=sigma,
maxfev=niterations)
# substitute fitted parameters in sympy expression:
for j in range(nparams):
y_list[i] = y_list[i].subs(a[j], popt[j])
# calculate the new transition_value:
if nparams > 0:
transition_value = func(thresh_list[i + 1], *popt)
else:
transition_value = func(thresh_list[i + 1])
# After all sub-functions are fitted, combine them to a piecewise function.
# This is a terrible hack, but I couldn't find out how to create piecewise
# functions dynamically...
if len(y_list) == 1:
y = y_list[0]
elif len(y_list) == 2:
y = Piecewise((y_list[0], z <= thresh_list[1]),
(y_list[1], True))
elif len(y_list) == 3:
y = Piecewise((y_list[0], z <= thresh_list[1]),
(y_list[1], z <= thresh_list[2]),
(y_list[2], True))
elif len(y_list) == 4:
y = Piecewise((y_list[0], z <= thresh_list[1]),
(y_list[1], z <= thresh_list[2]),
(y_list[2], z <= thresh_list[3]),
(y_list[3], True))
elif len(y_list) == 5:
y = Piecewise((y_list[0], z <= thresh_list[1]),
(y_list[1], z <= thresh_list[2]),
(y_list[2], z <= thresh_list[3]),
(y_list[3], z <= thresh_list[4]),
(y_list[4], True))
else:
raise ValueError('More than five sub-functions not implemented yet')
return y
def create_deriv(funcname, func):
"""Creates the derivative of the function, taking into account that v2 has
two "derivatives".
careful: returns tuple of two functions if funcname==v2, else one function
"""
z = symbols('z')
if funcname != 'v2':
return func.diff(z)
else:
deriv = func.diff(z)
deriv_sig = sqrt(func).diff(z)
return (deriv, deriv_sig)
def verify_input(name, infunc_list, thresh_list,
weight_list, wefact_list):
"""rudimentary checks if the functions, weights and thresholds are faulty
"""
nfuncs = len(infunc_list)
if len(thresh_list) != nfuncs + 1:
raise ValueError('Number of functions and thresholds disagree for ' +
var + ' of ' + name)
if len(weight_list) != nfuncs * 2:
raise ValueError('Number of functions and weights disagree for ' +
var + ' of ' + name)
if len(wefact_list) != nfuncs * 2:
raise ValueError('Number of functions and weight factors disagree' +
' for ' + var + ' of ' + name)
def verify_func_str(var, func_str):
"""Checks if the function string has linearly increasing parameters,
starting with 0 (i.e. "a0, a1, a2..."), because otherwise there is only
a cryptic error in minpack.
"""
index_list = []
for c, char in enumerate(func_str):
if char == 'a':
index_list.append(int(func_str[c+1]))
if list(range(0, len(index_list))) != index_list:
raise ValueError(func_str + ' has non-monotonically increasing' +
'parameter indices or does not start with a0' +
' in variable ' + var)
def main(name, var_list, abl_height, infunc_list_dict, thresh_list_dict,
weight_list_dict, wefact_list_dict, teston, saveon, printon):
"""Start routines, print output (if printon), save Fortran functions in a
file and start testing (if teston)
"""
func_dict, deri_dict = {}, {}
# file_str stores everything that is written to file in one string
file_str = '!' + 78*'=' + 'n' + '! ' + name + 'n!' + 78*'=' + 'n'
if printon:
print(' ' + 78*'_' + ' ')
print('|' + 78*' ' + '|')
print('|' + 30*' ' + name + (48-len(name))*' ' + '|')
print('|' + 78*' ' + '|')
data = read_scm_data(name)
for var in var_list:
verify_input(name, infunc_list_dict[var], thresh_list_dict[var],
weight_list_dict[var], wefact_list_dict[var])
if printon:
print('! ----- ' + var)
file_str += '! ----- ' + var + 'n'
# use data.z.values to get rid of the pandas-overhead and because
# some stuff is apparently not possible otherwise (like data.z[-1])
func_dict[var] = fit_func(var, abl_height, data.z.values,
data[var].values, infunc_list_dict[var],
thresh_list_dict[var], weight_list_dict[var],
wefact_list_dict[var])
func_fstr = fcode(func_dict[var], source_format='free', assign_to=var,
standard=95)
if printon:
print(func_fstr)
file_str += func_fstr + 'n'
if var != 'v2':
deri_dict[var] = create_deriv(var, func_dict[var])
deri_fstr = fcode(deri_dict[var], source_format='free',
assign_to='d'+var)
if printon:
print(deri_fstr)
file_str += deri_fstr + 'nn'
else:
deri_dict[var], deri_dict['sigv'] = create_deriv(var,
func_dict[var])
deri_fstr = fcode(deri_dict[var], source_format='free',
assign_to='d'+var, standard=95)
deri2_fstr = fcode(deri_dict['sigv'], source_format='free',
assign_to='dsigv', standard=95)
file_str += deri_fstr + 'n'
file_str += deri2_fstr + 'nn'
if printon:
print(deri_fstr)
print(deri2_fstr)
if printon:
print('')
if printon:
print('|' + 78*'_' + '|n')
file_str = file_str + 'nn' # end with newlines
if teston:
test_fit(name, var_list, func_dict, data, saveon)
# save fortran functions in file:
filename = name + '_turbparas.inc'
with open(filename, 'w') as f:
f.write(file_str)
if __name__ == '__main__':
name = 'tmpBUB'
main(name, *config(name))
```

The code is currently not runable, because the input-file is missing. I couldn’t find a canonical way to upload data, please advise. It’s currently a 160kB .csv file.

The code runs and does what I want, but I don’t have any formal training in programming and I’m sure it could be improved. Speed is not a huge issue (really depends on how complicated the functions to be fit are), but reliability and adaptability are.

Some points I know are “wrong”:

- too many comments that explain what the code does (I like those, because I forget stuff)
- the input strings for the base sub-functions are too long (>80) and lack spaces around
`*`

. It’s a compromise.
- if a height-range has less data points than the parameters of the corresponding subfunction, the code halts with a helpful minpack error message.

Some details I’d like to be different but also know to be impossible without changing `sympy`

:

- Fortran output with 4 instead of 3 spaces
- Fortran output with 132 line length instead of 80.
- A dynamic way to combine the pieces to one piecewise function to avoid those
`if`

conditions at the end of ‘fit_func’. (maybe that is possible?)

