Analysis of pollution, climate and health data in Vitolo et al., Earth and Space Science (2018)
This is a short HOWTO describing the data analysis performed to learn the Bayesian network spanning the pollution, climate and health data in “Modelling Air Pollution, Climate and Health Data Using Bayesian Networks: a Case Study of the English Regions.” by Vitolo, Scutari, Ghalaieny, Tucker and Russell (Earth and Space Science, 2018). The code was initially prototyped by Claudia Vitolo and then parallelised and extended by Marco Scutari; it implements a parallel versions of the Structural EM algorithm for Bayesian network structure learning in the presence of missing data.
The data
The data (codenamed MEHRA from “Multi-dimensional Environment-Health Risk Analysis”) contains almost 50 million observations to explore the interplay between environmental factors, exposure levels to outdoor air pollutants, and health outcomes in the English regions of the United Kingdom between 1981 and 2014. The CLGBN learned in the paper (yellow nodes are discrete, blue nodes are Gaussian, and yellow nodes are conditional linear Gaussian) is the following:
It comprises 24 variables describing the concentrations of various air pollutants (O3, PM2.5, PM10, SO2, NO2, CO), geography (latitude, longitude, latitude, region and zone type), climate (wind speed and direction, temperature, rainfall, solar radiation), demography and mortality rates. Many of these variables contains large amounts of missing values, corresponding to measurements that were not available in the older years' data.
The data is available from:
Claudia Vitolo, Marco Scutari, Mohamed Ghalaieny, Allan Tucker and Andrew Russell. (2017). MEHRA data and model for the English regions [Data set]. Zenodo. https://doi.org/10.5281/zenodo.838571
Learning the Bayesian network
First we load the data and the packages we will use to learn the model: bnlearn for the Structural EM and parallel to spread the computation across several CPUs.
> library(bnlearn) > library(parallel) > > training = readRDS("trainingESS.rds")
Other than the data, we need to inform structure learning about which arcs are illogical given
our background knowledge of the data (e.g. pollutants such as co
and
o3
influencing the Day
). This will be used as a massive blacklist to
reduce the number of candidate networks we will need to score.
> bl = data.frame( + "from" = c(rep("Region", 10), rep("Zone", 10), rep("Type", 10), rep("Year", 10), + rep("Season", 10), rep("Month", 10), rep("Day", 10), rep("Hour", 10), + rep("Latitude", 10), rep("Longitude", 10), rep("Altitude", 10), + rep("CVD60", 23), rep("t2m", 11), rep("ws", 11), rep("wd", 11), + rep("tp", 11), rep("blh", 11), rep("ssr", 11), rep("no2", 11), + rep("so2", 11), rep("co", 11), rep("o3", 11), rep("pm10", 11), + rep("pm2.5", 11)), + "to" = c("Zone", "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Longitude", "Altitude", "Region", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Season", "Month", "Day", + "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Type", "Year", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Day", "Hour", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", "Year", + "Season", "Month", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", "Year", + "Season", "Month", "Day", "Hour", "Longitude", "Altitude", "Region", + "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Region", "Zone", "Type", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "t2m", "ws", "wd", "tp", "blh", "ssr", "no2", "o3", + "so2", "co", "pm10", "pm2.5", "Region", "Zone", "Type", "Year", + "Season", "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Longitude", "Altitude", "Region", "Zone", "Type", "Year", "Season", + "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", + "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", + "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude", + "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month", + "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone", + "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude", + "Longitude", "Altitude", "Region", "Zone", "Type", "Year", "Season", + "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude", + "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", + "Latitude", "Longitude", "Altitude"))
Having set up the data and the blacklist, we now create the variables we will use to track the status of Structural EM:
incompleteColumns:
the names of the variables that contain missing data;rowsCompleteObservations
andcompleteObservations
: the indexes of the observations that contain missing data, and the subset of complete observations;dagCurrent
andbnCurrent
: thebn
andbn.fit
objects encoding the candidate Bayesian network from the previous iteration;dagNew
andbnNew
: thebn
andbn.fit
objects encoding the new candidate Bayesian network at the end of the current iteration.
> incompleteColumns = names(which(sapply(training, anyNA))) > rowsCompleteObservations = which(complete.cases(training)) > completeObservations = training[rowsCompleteObservations, ] > dagNew = dagCurrent = empty.graph(names(completeObservations)) > bnNew = bnCurrent = bn.fit(dagCurrent, completeObservations)
The last thing left to do to prepare for the Structural EM is to initialise the parallel cluster:
- create 10 slave processes;
- load bnlearn in each slave;
- export the blacklist
bl
to each slave; - split the data into 10 subsets and export one to each slave.
> cl = makeCluster(10) > invisible(clusterEvalQ(cl, library(bnlearn))) > clusterExport(cl, "bl") > > # split the data into as many parts as there are slave processes, and export > # one part to each slave. > splitted = split(sample(nrow(training)), seq(length(cl))) > > for (i in seq_along(splitted)) { + data_split = training[splitted[[i]], , drop = FALSE] + clusterExport(cl[i], "data_split") + }#FOR
Considering the size of the data, we want to perform as much computation as possible in
parallel; in order to do so we export the function impute_split()
below to
perform imputation on each subset of data in the slave processes. (Note that this function
predates the impute()
function currently implemented in bnlearn,
but does more or less the same thing.)
> impute_split = function(n = 50) { + nodes = nodes(bnCurrent) + # variables corresponding to isolated nodes can be quickly imputed by their + # expectations. + for (var in nodes) { + missing = is.na(data_split[, var]) + if ((length(nbr(bnCurrent, var)) == 0) && (any(missing))) + data_split[missing, var] = + rnorm(length(which(missing)), mean = bnCurrent[[var]]$coef, + sd = bnCurrent[[var]]$sd / sqrt(n)) + }#FOR + # reassess which observations have missing data. + missing = !complete.cases(data_split) + for (i in which(missing)) { + from = nodes[which(!is.na(data_split[i, ]))] + to = setdiff(nodes, from) + # use the observed part of the observation as the evidence. + if (length(from) == 0) + evidence = TRUE + else + evidence = lapply(data_split[i, from], + function(x) if (is.factor(x)) as.character(x) else x) + # simulate the particles and the weights using likelihood weighting. + particles = + cpdist(bnCurrent, nodes = to, evidence = evidence, method = "lw", n = n) + # impute by posterior mode (discrete variables) or posterior expectation + # (continuous variables). + particles = sapply(particles, function(x, w) { + if (is.factor(x)) + names(which.max(by(w, INDICES = x, FUN = sum))) + else if (is.numeric(x)) + weighted.mean(x = x, w = w) + }, w = attr(particles, "weights")) + data_split[i, to] = particles + }#FOR + return(data_split) + }#IMPUTE_SPLIT > > clusterExport(cl, "impute_split")
This is the main loop that iterates the expectation and maximisation steps of Structural EM, performing up to 10 steps before stopping. The two steps are as follows:
- Expectation step: missing values are imputed with
impute_split()
on each slave. - Maximisation step: the structure of the network is learned with
hc()
in parallel on the data in each slave, and the resulting networks are averaged withaveraged.network()
to produce a consensus network for the whole data set. The parameters of this network are then learned withbn.fit()
.
The loop stops when the new candidate network dagNew
is identical to the current
network dagCurrent
.
> for (iteration in seq(10)) { + # export the current network. + dagCurrent = dagNew + bnCurrent = bnNew + clusterExport(cl, c("dagCurrent", "bnCurrent")) + # expectation step: impute the missing data points on the data splits. + current = training + invisible(clusterEvalQ(cl, { + complete = impute_split() + NULL + })) + # maximization step: learn one network structure from each split and build + # a consensus network using model averaging. + models = parLapply(cl, seq(length(cl)), function(...) { + # starting from the previous network assumes that the new network will be + # similar, with few arc changes to evaluate. + hc(complete, blacklist = bl, start = dagCurrent) + }) + # average the networks (and make sure the result is completely directed). + strengthNew = custom.strength(models, nodes = nodes(dagCurrent)) + dagNew = averaged.network(strengthNew) + dagNew = cextend(dagNew) + # if there was no change, the network from the previous iteration is final. + if (isTRUE(all.equal(dagCurrent, dagNew))) + break + # retrieve the imputed values. + for (i in incompleteColumns) { + imputed = clusterCall(cl, function(col) complete[, col], col = i) + current[unlist(splitted), i] = unlist(imputed) + }#THEN + # fit the parameters. + bnNew = bn.fit(dagNew, data = current, keep.fitted = FALSE) + # save the DAG, the arc strengths and the fitted BN. + save(strengthNew, file = paste("strengthNew-", iteration, ".rda", sep = "")) + save(dagNew, file = paste("dagNew-", iteration, ".rda", sep = "")) + save(bnNew, file = paste("bnNew-", iteration, ".rda", sep = "")) + save(imputed, file = paste("imputed-", iteration, ".rda", sep = "")) + }#FOR > > stopCluster(cl)
Tue Jan 30 12:08:08 2018
with bnlearn
4.3
and R version 3.0.2 (2013-09-25)
.