DES using SimPy#
Basic model structure (using object-oriented programming)#
Two main ways to programme in python:
Functional - write functions (designed to do one thing)
Object oriented - create objects which have attributes and methods. These objects are created as classes. This is useful when have lots of logic to use, or when want to make copies of a similar thing.
A basic structure of a SimPy model:
Class to store global model parameters (which you won’t create instances of, but instead will refer to directly when need to access something, hence might be lower case)
Class to represent entity (which will have their attributes)
Class to represent system (which will have generators, entity journeys, and store simpy environment)
Class to represent trial (i.e. batch of simulation runs with methods to run them, and to store, record and display results from the trial) [source]
flowchart TD; param("<b>Global model parameters</b>"); entity("<b>Entity</b><br>Attributes"); system("<b>System</b><br>Generators<br>Resources<br>Entity journeys<br>Stores simpy environment"); trial("<b>Trial</b><br>Run model<br>Record and display result")
Where store results:
flowchart TD; param("<b>Global model parameters</b>"); entity("<b>Entity</b><br>Entity queue time"); system("<b>System</b><br>Create and populate<br>results dataframe<br>with row for each entity"); trial("<b>Trial</b><br>Run model<br>Record and display result")
View full code example
Code source: HSMA
This is a basic example (e.g. doesn’t have sim-tools distributions with reproducibility)
import simpy
import random
import pandas as pd
# Global parameter values
# Don't make instance of this class, just access it directly
class g:
patient_inter = 5
mean_n_consult_time = 6
number_of_nurses = 1
sim_duration = 120
number_of_runs = 5
# Entity (Patient) with two attributes: ID and time queueing
class Patient:
def __init__(self, p_id):
self.id = p_id
self.q_time_nurse = 0
class Model:
# Constructor to set up model for run
def __init__(self, run_number):
self.env = simpy.Environment()
self.patient_counter = 0
self.nurse = simpy.Resource(self.env, capacity=g.number_of_nurses)
self.run_number = run_number
# Blank dataframe for results
self.results_df = pd.DataFrame()
self.results_df["Patient ID"] = [1]
self.results_df["Q Time Nurse"] = [0.0]
self.results_df["Time with Nurse"] = [0.0]
self.results_df.set_index("Patient ID", inplace=True)
# Blank attribute to store mean queue time of run
self.mean_q_time_nurse = 0
# Generator function for patient arriving
def generator_patient_arrivals(self):
# Infinite loop does this indefinitely during simulation run
while True:
self.patient_counter += 1
# Create a new patient
p = Patient(self.patient_counter)
# Send through process
self.env.process(self.attend_clinic(p))
# Sample time to next patient (inter-arrival time)
sampled_inter = random.expovariate(1.0 / g.patient_inter)
# Freeze this instance of this function until time has elapsed
yield self.env.timeout(sampled_inter)
# Generator function for patient going through clinic
def attend_clinic(self, patient):
# Wait start time
start_q_nurse = self.env.now
# Request a nurse resource
with self.nurse.request() as req:
# Queue until resource available
yield req
# Wait end time
end_q_nurse = self.env.now
patient.q_time_nurse = end_q_nurse - start_q_nurse
# Sample time with nurse
sampled_nurse_act_time = random.expovariate(1.0 /
g.mean_n_consult_time)
# Store queue time and time with nurse in results
self.results_df.at[patient.id, "Q Time Nurse"] = (
patient.q_time_nurse)
self.results_df.at[patient.id, "Time with Nurse"] = (
sampled_nurse_act_time)
# Freeze this instance of this function (i.e. for this entity) for the time spent with nurse
yield self.env.timeout(sampled_nurse_act_time)
# Sink
def calculate_run_results(self):
# Find mean queue time
self.mean_q_time_nurse = self.results_df["Q Time Nurse"].mean()
def run(self):
# Start up DES entity generators that create new patients
self.env.process(self.generator_patient_arrivals())
# Run the model
self.env.run(until=g.sim_duration)
# Calculate results
self.calculate_run_results()
# Print results
print (f"Run Number {self.run_number}")
print (self.results_df)
class Trial:
# Set up dataframe to store results from each run
def __init__(self):
self.df_trial_results = pd.DataFrame()
self.df_trial_results["Run Number"] = [0]
self.df_trial_results["Mean Q Time Nurse"] = [0.0]
self.df_trial_results.set_index("Run Number", inplace=True)
# Print trial results
def print_trial_results(self):
print ("Trial Results")
print (self.df_trial_results)
def run_trial(self):
for run in range(g.number_of_runs):
# Conduct run of model
my_model = Model(run)
my_model.run()
# Store results from run
self.df_trial_results.loc[run] = [my_model.mean_q_time_nurse]
# Print trial results
self.print_trial_results()
# To run the above, create instance of trial class and run it
my_trial = Trial()
my_trial.run_trial()
When we use env.timeout, it is specification to each entity (i.e. time passes for just that entity), and so if you kill a process, only that entity is affected. However, it is possible to have some specific types of process that interrupt each other. [source]
To add another activity, just write it after the first one in the pathway generator function (although outside the with statement, else will drag resource from previous activity with it, unless you want that) - see example.
You can model branching paths using conditional logic - see example.
Alternatives layouts#
Mostly below explains modifications based on the HSMA model layout. However, there are lots of different ways you can choose to lay out a SimPy model. Some more examples are provided below…
Basic example from Tom on MSc
You can see it’s pretty similar. Ignoring the slight differences in what is being modelled (and that is not collecting results), the main differences in how code is structured are:
No class for global parameters - defined within script
Function for patient attending clinic (service()) is within Patient rather than Model
Environment, Resource, run and process outside of the Model
class Patient:
'''
Encapsulates the process a patient caller undergoes when they dial 111
and speaks to an operator who triages their call.
'''
def __init__(self, identifier, env, operators):
'''
Constructor method
Params:
-----
identifier: int
a numeric identifier for the patient.
env: simpy.Environment
the simulation environment
operators: simpy.Resource
the call operators
'''
self.identifier = identifier
self.env = env
self.operators = operators
def service(self):
'''
simualtes the service process for a call operator
1. request and wait for a call operator
2. phone triage (triangular)
3. exit system
'''
# record the time that call entered the queue
start_wait = env.now
# request an operator
with self.operators.request() as req:
yield req
# record the waiting time for call to be answered
self.waiting_time = self.env.now - start_wait
print(f'operator answered call {self.identifier} at ' \
+ f'{self.env.now:.3f}')
# sample call duration.
call_duration = np.random.triangular(left=0.01, mode=0.12,
right=0.16)
yield self.env.timeout(call_duration)
print(f'call {self.identifier} ended {self.env.now:.3f}; ' \
+ f'waiting time was {self.waiting_time:.3f}')
class UrgentCareCallCentre:
'''
111 call centre model
'''
def __init__(self, env, operators):
'''
Constructor method
Params:
--------
env: simpy.Environment
operators: simpy.Resource
A pool of call operators triage incoming patient calls.
'''
self.env = env
self.operators = operators
def arrivals_generator(self):
'''
IAT is exponentially distributed with mean
Parameters:
------
env: simpy.Environment
operators: simpy.Resource
the call operators.
'''
# use itertools as it provides an infinite loop with a counter variable
for caller_count in itertools.count(start=1):
# 100 calls per hour (units = hours). Time between calls is 1/100
inter_arrival_time = np.random.exponential(1/100)
yield env.timeout(inter_arrival_time)
print(f'call arrives at: {env.now:.3f}')
new_caller = Patient(caller_count, self.env, self.operators)
env.process(new_caller.service())
# model parameters
RUN_LENGTH = 0.25
N_OPERATORS = 13
# create simpy environment and operator resources
env = simpy.Environment()
operators = simpy.Resource(env, capacity=N_OPERATORS)
# create a model
model = UrgentCareCallCentre(env, operators)
env.process(model.arrivals_generator())
env.run(until=RUN_LENGTH)
print(f'end of run. simulation clock time = {env.now}')
Producing entity arrivals (using generator functions)#
What are generator functions?
In a normal function we return values, which returns a value then terminates the function. In a generator function we yield values, which returns a value then pauses execution by saving states.
Local variables and their states are remembered between successive calls of the generator function.
A generator function is creating an iterator and returning an iterator object
We can use multiple yield statements in the generator function
Example:
# Source: https://www.geeksforgeeks.org/generators-in-python/
# Generator function
def simple_generator():
yield 1
yield 2
yield 3
# Generator object
x = simple_generator()
# Iterating over the generator object using next
print(next(x))
print(next(x))
print(next(x))
Why use them?
Easier to implement then iterators
Memory efficient - a normal function can return a sequence of items but it has to create a sequence in memory before it can give result - whilst a generator function produces one output at a time
Infinite sequence - can’t store infinite sequences in a given memory, but as generators only produce one item at a time, they can present an infinite stream of data [source]
You can make a python **generator expression **(instead of a function) which is a short-hand form of a generator function. You implement them similar to lambda functions but use round brackets. The difference here is:
List comprehension returns list of items
Generator expression returns an iterable object
Example:
# List comprehension
list_ = [i for i in range(x) if i % 2 == 0]
# Generator expression
list_ = (i for i in range(x) if i % 2 == 0)
The generator functions are used for entities arriving into a process
Set up simpy environment, customer counter, resource, run number, dataframe to store results
Create generator function to represent customer arrivals, which creates new customer, uses another generator (for calls), then moves time forward by inter-arrival time (which know from sampling from distribution)
Have generator function to represent customer calling helpline - requests resource, calculates time they had queued, samples time then spent with resource, then move forward by that amount of time (i.e. free that function in place for the activity time sample above) [source]
Resources#
Requesting multiple resources simultaneously#
For example, if you need to request a nurse and a room, before can proceed with appointment.
To set this up, create the resources, with attributes and columns in the results dataframe to record queue time and appointment time.
You’ll have conditional logic to check if have one resource (and need to wait for other), or have both resources available at once.
We request the resources without indentation, and so manually specifying when to stop using the resource which makes it easier when you are writing code that requests multiple resources at once.
# There are two ways to request a resource in SimPy...
# Method 1
with self.receptionist.request() as req:
yield req
# Rest of code here, to run whilst holding resource
# Method 2
nurse_request = self.receptionist.request()
yield nurse_request
# Rest of code here, to run whilst holding resource
self.nurse.release(nurse_request)
Priority queue#
Create priority based queueing with PriorityResource. It’s like the standard SimPy Resource class, but has functionality that allows it to select which entity to pull out of queue based on a priority value. To use:
Set up resource using PriorityResource rather than Resource
Have attribute in each entity that gives their priority (lower value = higher priority) When request PriorityResource, tell it what attribute it should use to determine priority in queue
If there are a few people waiting, it will just the person with the highest priority. If people have the same priority, it will choose the oldest of those.
Remember that our results only include people who get seen by the nurse - and so when the model stops, there could still be loads of people waiting in a queue - you’ll want to account for that. [source]
Sampling from distributions#
Sim-tools contains a bunch of functions you can use. This are set up to have individual seeds, to esaily enable reproducibility.
Example:
from sim_tools.distributions import Exponential
# Later in code...
self.patient_inter_arrival_dist = Exponential(mean = g.patient_inter, random_seed = self.run_number*2)
The Exponential Class from the package (you’ll see there’s actually only a few lines of code):
class Exponential(Distribution):
"""
Convenience class for the exponential distribution.
packages up distribution parameters, seed and random generator.
"""
def __init__(self, mean: float, random_seed: Optional[int] = None):
"""
Constructor
Params:
------
mean: float
The mean of the exponential distribution
random_seed: int, optional (default=None)
A random seed to reproduce samples. If set to none then a unique
sample is created.
"""
super().__init__(random_seed)
self.mean = mean
@abstractmethod
def sample(self, size: Optional[int] = None) -> float | np.ndarray:
"""
Generate a sample from the exponential distribution
Params:
-------
size: int, optional (default=None)
the number of samples to return. If size=None then a single
sample is returned.
"""
return self.rng.exponential(self.mean, size=size)
Variable arrival rates (i.e. time-dependent arrivals)#
You could have variable arrival rates (e.g. high in afternoon, low at night). To model this you can use sim-tools which has a class that creates a non-stationary poisson process via thinning. [source]
What is a non-stationary Poisson process with thinning?
First, we define a random process (or stochastic process) as a collection of random variables usually indexed by time. For each time point, the value is a random variable. They can be classified as continuous-time or discrete-time.
One example of a random process is the Poisson process. This is a continuous-time random process.[source] It is typically used when we want to account the occurence of something that happens at a certain rate over some time interval but is also completely at random, with occurences independent of each other (e.g. know that area has rate of 2 earthquakes a month, although their timing is otherwise completely random).[source]
A Poisson process - also known as stationary or time-stationary or time-homogenous Poisson process - has a fixed and constant rate over time. However, if the rate varies over time (e.g. higher rate of arrivals in afternoon than at night), then it is a non-stationary Poisson process.[source] Hence, a non-stationary Poisson process is an arrival process with an arrival rate that varies by time.[source]
A non-stationary Poisson process can be generated either by thinning or rate inversion. The thinning method involves first generating a stationary Poisson process with a constant rate, but then rejecting potential arrivals at a given time by a given probability. [source] In other words, thinning is an acceptance-rejection sampling method[source] from a time dependent distribution where each time period follows its own exponential distribution.[source]
To set this up we:
Provide dataframe with mean IAT at various time points
Set up arrival distribution in Model class with NSPPThinning() [source]
Tracking resource utilisation#
Method 1. Simple average across the run#
We can track how long each entity spends using a resource, then sum time spent, divide by time elapsed in model, and multiply by number of resources in model.
This may slightly overestimate utilisation though as we record time spent with the resource prior to that time elapsing - e.g. “If we have a model run time of 600 time units, and someone reaches the nurse at unit 595 but has an activity time of 30, we will record that and use it in our calculations - even though only 5 minutes of that time actually took place during our model run.” You could adapt code to account for that (e.g. check if time remaining in model is greater than activity time sampled, and record whichever of those is smaller). [source]
Method 2. Interval audit process#
For this, we create a function that takes:
A list of resources to monitor, and
A time interval at which to snapshot utilisation
This is integrated into the code, and then set up as a process. [source]
Overview of how HSMA code is modified for auditing:
Class |
Changes |
---|---|
Global model parameters |
• Set audit interval (e.g. 5 min) |
Patient |
- |
Model |
• Generator function to get metrics at given intervals, storing results as attribute |
Trial |
• Get audit results from model attributes |
That is a HSMA example. We can also look at a different example from Tom (basic code structure differs - see above). In his example, he does not modify the Model or Patient classes, but instead creates an Auditor class. It performs the same job - getting metrics at given intervals.
Audits occur at first_obs and then interval time units apart
Queues and services are lists that will store a tuple identifying what want to audit
Metrics is a dict to store audits
Scheduled_audit - env.timeout() for delay between aduits, record_queue_length() finds length of queues (for each service, append length of their queue to dictionary)
Tom’s Auditor class
class Auditor:
def __init__(self, env, run_length, first_obs=None, interval=None):
'''
Auditor Constructor
Params:
-----
env: simpy.Environment
first_obs: float, optional (default=None)
Time of first scheduled observation. If none then no scheduled
audit will take place
interval: float, optional (default=None)
Time period between scheduled observations. If none then no scheduled
audit will take place
'''
self.env = env
self.first_observation = first_obs
self.interval = interval
self.run_length = run_length
self.queues = []
self.service = []
# dict to hold states
self.metrics = {}
# scheduled the periodic audits
if not first_obs is None:
env.process(self.scheduled_observation())
env.process(self.process_end_of_run())
def add_resource_to_audit(self, resource, name, audit_type='qs'):
if 'q' in audit_type:
self.queues.append((name, resource))
self.metrics[f'queue_length_{name}'] = []
if 's' in audit_type:
self.service.append((name, resource))
self.metrics[f'system_{name}'] = []
def scheduled_observation(self):
'''
simpy process to control the frequency of
auditor observations of the model.
The first observation takes place at self.first_obs
and subsequent observations are spaced self.interval
apart in time.
'''
# delay first observation
yield self.env.timeout(self.first_observation)
self.record_queue_length()
self.record_calls_in_progress()
while True:
yield self.env.timeout(self.interval)
self.record_queue_length()
self.record_calls_in_progress()
def record_queue_length(self):
for name, res in self.queues:
self.metrics[f'queue_length_{name}'].append(len(res.queue))
def record_calls_in_progress(self):
for name, res in self.service:
self.metrics[f'system_{name}'].append(res.count + len(res.queue))
def process_end_of_run(self):
'''
Create an end of run summary
Returns:
---------
pd.DataFrame
'''
yield self.env.timeout(self.run_length - 1)
run_results = {}
for name, res in self.queues:
queue_length = np.array(self.metrics[f'queue_length_{name}'])
run_results[f'mean_queue_{name}'] = queue_length.mean()
for name, res in self.service:
total_in_system = np.array(self.metrics[f'system_{name}'])
run_results[f'mean_system_{name}'] = total_in_system.mean()
# #mean number in system and in queue (specific to operations)
# queue_length = np.array(self.metrics['queue_length_ops'])
# total_in_system = queue_length + np.array(self.metrics['service_ops'])
# run_results['mean_queue'] = queue_length.mean()
# run_results['mean_system'] = total_in_system.mean()
self.summary_frame = pd.Series(run_results).to_frame()
self.summary_frame.columns = ['estimate']
Testing a large number of scenarios#
We run scenarios with different levels of resources, etc. To test a large number, recommend the method here. It involves you having:
Dictionary of scenarios
Use itertools to create all possible permutations of scenarios
Add scenario name to g class, then enumerate through the scenarios and run each one, storing the results as you go [source]
Determining how many replications to run.#
‘You will now use the confidence interval method to select the number replications to run in order to get a good estimate the models mean performance. The narrower the confidence interval the more precise our estimate of the mean. In general, the more replications you run the narrower the confidence interval. The method requires you to set a predefined width of the confidence interval’ - e.g. 5% or 10% either side of mean. Use the following function to implement in Python…[source]
Confidence interval method to determine replication number
‘The method is less useful for values very close to zero. As the utilisation measures are between 0 and 1 it is recommended that you multiple the values by 100.’
Function:
def confidence_interval_method(replications, alpha=0.05, desired_precision=0.05,
min_rep=5, decimal_place=2):
'''
The confidence interval method for selecting the number of replications
to run in a simulation.
Finds the smallest number of replications where the width of the confidence
interval is less than the desired_precision.
Returns both the number of replications and the full results dataframe.
Parameters:
----------
replications: arraylike
Array (e.g. np.ndarray or list) of replications of a performance metric
alpha: float, optional (default=0.05)
procedure constructs a 100(1-alpha) confidence interval for the
cumulative mean.
desired_precision: float, optional (default=0.05)
Desired mean deviation from confidence interval.
min_rep: int, optional (default=5)
set to a integer > 0 and ignore all of the replications prior to it
when selecting the number of replications to run to achieve the desired
precision. Useful when the number of replications returned does not
provide a stable precision below target.
decimal_places: int, optional (default=2)
sets the number of decimal places of the returned dataframe containing
the results
Returns:
--------
tuple: int, pd.DataFrame
'''
n = len(replications)
cumulative_mean = [replications[0]]
running_var = [0.0]
for i in range(1, n):
cumulative_mean.append(cumulative_mean[i-1] + \
(replications[i] - cumulative_mean[i-1] ) / (i+1))
# running biased variance
running_var.append(running_var[i-1] + (replications[i]
- cumulative_mean[i-1]) \
* (replications[i] - cumulative_mean[i]))
# unbiased std dev = running_var / (n - 1)
with np.errstate(divide='ignore', invalid='ignore'):
running_std = np.sqrt(running_var / np.arange(n))
# half width of interval
dof = len(replications) - 1
t_value = t.ppf(1 - (alpha / 2), dof)
with np.errstate(divide='ignore', invalid='ignore'):
std_error = running_std / np.sqrt(np.arange(1, n+1))
half_width = t_value * std_error
# upper and lower confidence interval
upper = cumulative_mean + half_width
lower = cumulative_mean - half_width
# Mean deviation
with np.errstate(divide='ignore', invalid='ignore'):
deviation = (half_width / cumulative_mean) * 100
# commbine results into a single dataframe
results = pd.DataFrame([replications, cumulative_mean,
running_std, lower, upper, deviation]).T
results.columns = ['Mean', 'Cumulative Mean', 'Standard Deviation',
'Lower Interval', 'Upper Interval', '% deviation']
results.index = np.arange(1, n+1)
results.index.name = 'replications'
# get the smallest no. of reps where deviation is less than precision target
try:
n_reps = results.iloc[min_rep:].loc[results['% deviation']
<= desired_precision*100].iloc[0].name
except:
# no replications with desired precision
message = 'WARNING: the replications do not reach desired precision'
warnings.warn(message)
n_reps = -1
return n_reps, results.round(2)
Using function:
# run the method on the operator_wait replications
n_reps, conf_ints = \
confidence_interval_method(replications['operator_wait'].to_numpy(),
desired_precision=0.05)
# print out the min number of replications to achieve precision
print(f'\nminimum number of reps for 5% precision: {n_reps}\n')
# peek at table of results
conf_ints.head()
def plot_confidence_interval_method(n_reps, conf_ints, metric_name,
figsize=(12,4)):
'''
Plot the confidence intervals and cumulative mean
Parameters:
----------
n_reps: int
minimum number of reps selected
conf_ints: pandas.DataFrame
results of the `confidence_interval_method` function
metric_name: str
Name of the performance measure
figsize: tuple, optional (default=(12,4))
The size of the plot
Returns:
-------
matplotlib.pyplot.axis
'''
# plot cumulative mean + lower/upper intervals
ax = conf_ints[['Cumulative Mean', 'Lower Interval',
'Upper Interval']].plot(figsize=figsize)
# add the
ax.axvline(x=n_reps, ls='--', color='red')
ax.set_ylabel(f'cumulative mean: {metric_name}')
return ax
# plot the confidence intervals
ax = plot_confidence_interval_method(n_reps, conf_ints,
metric_name='operator_wait')
# quick check if the % deviation remains below 5% for the next 10 reps?
lookahead = 15
conf_ints.iloc[n_reps-1:n_reps+lookahead]
# run the method on the operator_wait replications
n_reps, conf_ints = \
confidence_interval_method(replications['operator_wait'].to_numpy(),
desired_precision=0.05, min_rep=36)
# print out the min number of replications to achieve precision
print(f'\nminimum number of reps for 5% precision: {n_reps}\n')
# plot the confidence intervals
ax = plot_confidence_interval_method(n_reps, conf_ints,
metric_name='operator_wait', figsize=(9,6))
Appointment booking (i.e. scheduling)#
We may want to model appointment booking (i.e. delay between enter system to book appointment, and attending appointment), rather than immediate attendance. See example from HSMA, which is based on Tom’s example.
Overview of core changes to HSMA code for appointment booking (with time unit here being days rather than minutes):
Class |
Modifications |
---|---|
Global parameters |
New attributes: |
Patient |
New attributes: |
Booker |
Attributes: |
Model |
New attributes: |
Trial |
Save new metrics (appointment wait) |
You can allow patients to have a range of possible clinics that they can book into. You can do this by having a pooling matrix which is a simple table of 0s and 1s determining whether a patient can access each clinic. The package sim_utility has some premade Booker classes. To work with pooled booking, you could use the class one of their pooled booker classes e.g. LowPriorityPooledBooker
from sim_utility (rather than LowPriorityBooker
). [source]
Parallelisation#
By default, Python code will run everything sequentially on a single core. We can run code in parallel to reduce run length. Use Joblib to split SimPy code to run across multiple processor cores. May not be possible when deploying on web.
How does Joblib work?
We can illustrate this with a simple function:
import time
import math
from joblib import Parallel, delayed
def my_function(i):
# Wait one second
time.sleep(1)
# Return square root of i**2
return math.sqrt(i**2)
If we run this function with a for loop, it takes ten seconds:
i = num
start = time.time()
# Run function in for loop
for i in range(num):
my_function(i)
end = time.time()
# Print time elapsed
print('{:.4f} s'.format(end-start))
10.0387 s
If we run it using Parallel and delayed functions from joblib, it takes five seconds.[source] We use the delayed() function as it prevents the function from running. If we just had Parallel(n_jobs=2)(my_function(i) for i in range(num))
, by the time it gets passed to Paralell, the my_function(i)
calls have already returned, and there’s nothing left to execute in Parallel. [source]
start = time.time()
# Run function using parallel processing
# n_jobs is the number of parallel jobs
Parallel(n_jobs=2)(delayed(my_function(i) for i in range(num)))
end = time.time()
# Print time elapsed
print('{:.4f} s'.format(end-start))
5.6560 s
An overview of the changes we make to implement this for our SimPy model are below. These are from this HSMA example, which is based on an example from Mike.
Class |
Changes |
---|---|
Global parameters |
- |
Patient |
- |
Model |
- |
Trial |
• Changes for how save results, so save dictionary of results from run into list (rather than setting up a dummy dataframe and using .loc to write results into correct row, as will just end up with empty results list, when running parallel) |
Input modelling#
Mike write some code (archived but copied below) which fits various distributions to the data, then conductes Chi-Squared and KS-Test and ranks by the Chi-Squared statistics. It produces histograms, p-p plots, and q-q plots of the data against the theoretical distributions
Auto fit
import warnings
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
import scipy
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")
def auto_fit(data_to_fit, hist=False, pp=False, dist_names=None):
y = data_to_fit.dropna().to_numpy()
size = len(y)
sc = StandardScaler()
yy = y.reshape(-1, 1)
sc.fit(yy)
y_std = sc.transform(yy)
y_std = y_std.flatten()
del yy
if dist_names is None:
dist_names = ['beta',
'expon',
'gamma',
'lognorm',
'norm',
'pearson3',
'weibull_min',
'weibull_max']
# Set up empty lists to store results
chi_square = []
p_values = []
# Set up 50 bins for chi-square test
# Observed data will be approximately evenly distrubuted aross all bins
percentile_bins = np.linspace(0,100,50)
percentile_cutoffs = np.percentile(y_std, percentile_bins)
#"fix": sometimes np.percentile produces cut-off that are not sorted!?
#Why? The test data was synthetic LoS in an ED and they are rounded numbers...
observed_frequency, bins = (np.histogram(y_std, bins=np.sort(percentile_cutoffs)))
cum_observed_frequency = np.cumsum(observed_frequency)
# Loop through candidate distributions
for distribution in dist_names:
# Set up distribution and get fitted distribution parameters
dist = getattr(scipy.stats, distribution)
param = dist.fit(y_std)
# Obtain the KS test P statistic, round it to 5 decimal places
p = scipy.stats.kstest(y_std, distribution, args=param)[1]
p = np.around(p, 5)
p_values.append(p)
# Get expected counts in percentile bins
# This is based on a 'cumulative distrubution function' (cdf)
cdf_fitted = dist.cdf(percentile_cutoffs, *param[:-2], loc=param[-2],
scale=param[-1])
expected_frequency = []
for bin in range(len(percentile_bins)-1):
expected_cdf_area = cdf_fitted[bin+1] - cdf_fitted[bin]
expected_frequency.append(expected_cdf_area)
# calculate chi-squared
expected_frequency = np.array(expected_frequency) * size
cum_expected_frequency = np.cumsum(expected_frequency)
ss = sum (((cum_expected_frequency - cum_observed_frequency) ** 2) / cum_observed_frequency)
chi_square.append(ss)
# Collate results and sort by goodness of fit (best at top)
results = pd.DataFrame()
results['Distribution'] = dist_names
results['chi_square'] = chi_square
results['p_value'] = p_values
results.sort_values(['chi_square'], inplace=True)
# Report results
print('\nDistributions sorted by goodness of fit:')
print('----------------------------------------')
print(results)
if hist:
fitted_histogram(y, results)
if pp:
pp_qq_plots(y, y_std, dist_names)
def fitted_histogram(y, results):
size = len(y)
x = np.arange(len(y))
# Divide the observed data into 100 bins for plotting (this can be changed)
number_of_bins = 100
bin_cutoffs = np.linspace(np.percentile(y,0), np.percentile(y,99),number_of_bins)
# Create the plot
h = plt.hist(y, bins = bin_cutoffs, color='0.75')
# Get the top three distributions from the previous phase
number_distributions_to_plot = 5
dist_names = results['Distribution'].iloc[0:number_distributions_to_plot]
# Create an empty list to stroe fitted distribution parameters
parameters = []
# Loop through the distributions ot get line fit and paraemters
for dist_name in dist_names:
# Set up distribution and store distribution paraemters
dist = getattr(scipy.stats, dist_name)
param = dist.fit(y)
parameters.append(param)
# Get line for each distribution (and scale to match observed data)
pdf_fitted = dist.pdf(x, *param[:-2], loc=param[-2], scale=param[-1])
scale_pdf = np.trapz (h[0], h[1][:-1]) / np.trapz (pdf_fitted, x)
pdf_fitted *= scale_pdf
# Add the line to the plot
plt.plot(pdf_fitted, label=dist_name)
# Set the plot x axis to contain 99% of the data
# This can be removed, but sometimes outlier data makes the plot less clear
plt.xlim(0,np.percentile(y,99))
# Add legend and display plot
plt.legend()
plt.show()
# Store distribution paraemters in a dataframe (this could also be saved)
dist_parameters = pd.DataFrame()
dist_parameters['Distribution'] = (
results['Distribution'].iloc[0:number_distributions_to_plot])
dist_parameters['Distribution parameters'] = parameters
# Print parameter results
print ('\nDistribution parameters:')
print ('------------------------')
for index, row in dist_parameters.iterrows():
print ('\nDistribution:', row[0])
print ('Parameters:', row[1] )
def pp_qq_plots(y, y_std, dist_names):
## qq and pp plots
data = y_std.copy()
data.sort()
size = len(y)
x = np.arange(len(y))
x = x.reshape(-1, 1)
y = y.reshape(-1, 1)
# Loop through selected distributions (as previously selected)
for distribution in dist_names:
# Set up distribution
dist = getattr(scipy.stats, distribution)
param = dist.fit(y_std)
# Get random numbers from distribution
norm = dist.rvs(*param[0:-2],loc=param[-2], scale=param[-1],size = size)
norm.sort()
# Create figure
fig = plt.figure(figsize=(8,5))
# qq plot
ax1 = fig.add_subplot(121) # Grid of 2x2, this is suplot 1
ax1.plot(norm,data,"o")
min_value = np.floor(min(min(norm),min(data)))
max_value = np.ceil(max(max(norm),max(data)))
ax1.plot([min_value,max_value],[min_value,max_value],'r--')
ax1.set_xlim(min_value,max_value)
ax1.set_xlabel('Theoretical quantiles')
ax1.set_ylabel('Observed quantiles')
title = 'qq plot for ' + distribution +' distribution'
ax1.set_title(title)
# pp plot
ax2 = fig.add_subplot(122)
# Calculate cumulative distributions
bins = np.percentile(norm,range(0,101))
data_counts, bins = np.histogram(data,bins)
norm_counts, bins = np.histogram(norm,bins)
cum_data = np.cumsum(data_counts)
cum_norm = np.cumsum(norm_counts)
cum_data = cum_data / max(cum_data)
cum_norm = cum_norm / max(cum_norm)
# plot
ax2.plot(cum_norm,cum_data,"o")
min_value = np.floor(min(min(cum_norm),min(cum_data)))
max_value = np.ceil(max(max(cum_norm),max(cum_data)))
ax2.plot([min_value,max_value],[min_value,max_value],'r--')
ax2.set_xlim(min_value,max_value)
ax2.set_xlabel('Theoretical cumulative distribution')
ax2.set_ylabel('Observed cumulative distribution')
title = 'pp plot for ' + distribution +' distribution'
ax2.set_title(title)
# Display plot
plt.tight_layout(pad=4)
plt.show()
We can then run the code as follows: [source]
from input_modelling.fitting import auto_fit
rng = np.random.default_rng(42)
samples = rng.exponential(scale=32, size=10_000)
samples = pd.DataFrame(samples)
samples.head()
auto_fit(samples)
# Plot the distributions and use a few extra options
dists_to_test = ['expon', 'gamma']
auto_fit(samples, hist=True, pp=True, dist_names=dists_to_test)
Viewing results from the model#
I have made limited notes on output analysis, mainly focussing this on how the model is set up. However, you can see more about output analysis from each the HSMA DES book pages (which typically shows the output from all the different variants of model setup, using varying visualisations).