Analysis of pollution, climate and health data in Vitolo et al., Earth and Space Science (2018)

This is a short HOWTO describing the data analysis performed to learn the Bayesian network spanning the pollution, climate and health data in “Modelling Air Pollution, Climate and Health Data Using Bayesian Networks: a Case Study of the English Regions.” by Vitolo, Scutari, Ghalaieny, Tucker and Russell (Earth and Space Science, 2018). The code was initially prototyped by Claudia Vitolo and then parallelised and extended by Marco Scutari; it implements a parallel versions of the Structural EM algorithm for Bayesian network structure learning in the presence of missing data.

The data

The data (codenamed MEHRA from “Multi-dimensional Environment-Health Risk Analysis”) contains almost 50 million observations to explore the interplay between environmental factors, exposure levels to outdoor air pollutants, and health outcomes in the English regions of the United Kingdom between 1981 and 2014. The CLGBN learned in the paper (yellow nodes are discrete, blue nodes are Gaussian, and yellow nodes are conditional linear Gaussian) is the following:

reference network from the paper

It comprises 24 variables describing the concentrations of various air pollutants (O3, PM2.5, PM10, SO2, NO2, CO), geography (latitude, longitude, latitude, region and zone type), climate (wind speed and direction, temperature, rainfall, solar radiation), demography and mortality rates. Many of these variables contains large amounts of missing values, corresponding to measurements that were not available in the older years' data.

The data is available from:

Claudia Vitolo, Marco Scutari, Mohamed Ghalaieny, Allan Tucker and Andrew Russell. (2017). MEHRA data and model for the English regions [Data set]. Zenodo. https://doi.org/10.5281/zenodo.838571

Learning the Bayesian network

First we load the data and the packages we will use to learn the model: bnlearn for the Structural EM and parallel to spread the computation across several CPUs.

> library(bnlearn)
> library(parallel)
> 
> training = readRDS("trainingESS.rds")

Other than the data, we need to inform structure learning about which arcs are illogical given our background knowledge of the data (e.g. pollutants such as co and o3 influencing the Day). This will be used as a massive blacklist to reduce the number of candidate networks we will need to score.

> bl = data.frame(
+   "from" = c(rep("Region", 10), rep("Zone", 10), rep("Type", 10), rep("Year", 10),
+              rep("Season", 10), rep("Month", 10), rep("Day", 10), rep("Hour", 10),
+              rep("Latitude", 10), rep("Longitude", 10), rep("Altitude", 10),
+              rep("CVD60", 23), rep("t2m", 11), rep("ws", 11), rep("wd", 11),
+              rep("tp", 11), rep("blh", 11), rep("ssr", 11), rep("no2", 11),
+              rep("so2", 11), rep("co", 11), rep("o3", 11), rep("pm10", 11),
+              rep("pm2.5", 11)),
+   "to" = c("Zone", "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude",
+            "Longitude", "Altitude", "Region", "Type", "Year", "Season", "Month",
+            "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone",
+            "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude",
+            "Altitude", "Region", "Zone", "Type", "Season", "Month", "Day",
+            "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone",
+            "Type", "Year", "Month", "Day", "Hour", "Latitude", "Longitude",
+            "Altitude", "Region", "Zone", "Type", "Year", "Season", "Day", "Hour",
+            "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", "Year",
+            "Season", "Month", "Hour", "Latitude", "Longitude", "Altitude",
+            "Region", "Zone", "Type", "Year", "Season", "Month", "Day",
+            "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type", "Year",
+            "Season", "Month", "Day", "Hour", "Longitude", "Altitude", "Region",
+            "Zone", "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude",
+            "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month",
+            "Day", "Hour", "Latitude", "Longitude", "Region", "Zone", "Type",
+            "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude",
+            "Altitude", "t2m", "ws", "wd", "tp", "blh", "ssr", "no2", "o3",
+            "so2", "co", "pm10", "pm2.5", "Region", "Zone", "Type", "Year",
+            "Season", "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude",
+            "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour",
+            "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type",
+            "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude",
+            "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month",
+            "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone",
+            "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude",
+            "Longitude", "Altitude", "Region", "Zone", "Type", "Year", "Season",
+            "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude",
+            "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour",
+            "Latitude", "Longitude", "Altitude", "Region", "Zone", "Type",
+            "Year", "Season", "Month", "Day", "Hour", "Latitude", "Longitude",
+            "Altitude", "Region", "Zone", "Type", "Year", "Season", "Month",
+            "Day", "Hour", "Latitude", "Longitude", "Altitude", "Region", "Zone",
+            "Type", "Year", "Season", "Month", "Day", "Hour", "Latitude",
+            "Longitude", "Altitude", "Region", "Zone", "Type", "Year", "Season",
+            "Month", "Day", "Hour", "Latitude", "Longitude", "Altitude",
+            "Region", "Zone", "Type", "Year", "Season", "Month", "Day", "Hour",
+            "Latitude", "Longitude", "Altitude"))

Having set up the data and the blacklist, we now create the variables we will use to track the status of Structural EM:

  • incompleteColumns: the names of the variables that contain missing data;
  • rowsCompleteObservations and completeObservations: the indexes of the observations that contain missing data, and the subset of complete observations;
  • dagCurrent and bnCurrent: the bn and bn.fit objects encoding the candidate Bayesian network from the previous iteration;
  • dagNew and bnNew: the bn and bn.fit objects encoding the new candidate Bayesian network at the end of the current iteration.
> incompleteColumns = names(which(sapply(training, anyNA)))
> rowsCompleteObservations = which(complete.cases(training))
> completeObservations = training[rowsCompleteObservations, ]
> dagNew = dagCurrent = empty.graph(names(completeObservations))
> bnNew = bnCurrent = bn.fit(dagCurrent, completeObservations)

The last thing left to do to prepare for the Structural EM is to initialise the parallel cluster:

  • create 10 slave processes;
  • load bnlearn in each slave;
  • export the blacklist bl to each slave;
  • split the data into 10 subsets and export one to each slave.
> cl = makeCluster(10)
> invisible(clusterEvalQ(cl, library(bnlearn)))
> clusterExport(cl, "bl")
> 
> # split the data into as many parts as there are slave processes, and export
> # one part to each slave.
> splitted = split(sample(nrow(training)), seq(length(cl)))
> 
> for (i in seq_along(splitted)) {

+   data_split = training[splitted[[i]], , drop = FALSE]
+   clusterExport(cl[i], "data_split")

+ }#FOR

Considering the size of the data, we want to perform as much computation as possible in parallel; in order to do so we export the function impute_split() below to perform imputation on each subset of data in the slave processes. (Note that this function predates the impute() function currently implemented in bnlearn, but does more or less the same thing.)

> impute_split = function(n = 50) {

+   nodes = nodes(bnCurrent)

+   # variables corresponding to isolated nodes can be quickly imputed by their
+   # expectations.
+   for (var in nodes) {

+     missing = is.na(data_split[, var])

+     if ((length(nbr(bnCurrent, var)) == 0) && (any(missing)))
+       data_split[missing, var] =
+         rnorm(length(which(missing)), mean = bnCurrent[[var]]$coef,
+           sd = bnCurrent[[var]]$sd / sqrt(n))

+   }#FOR

+   # reassess which observations have missing data.
+   missing = !complete.cases(data_split)

+   for (i in which(missing)) {

+     from = nodes[which(!is.na(data_split[i, ]))]
+     to = setdiff(nodes, from)

+     # use the observed part of the observation as the evidence.
+     if (length(from) == 0)
+       evidence = TRUE
+     else
+       evidence = lapply(data_split[i, from],
+                    function(x) if (is.factor(x)) as.character(x) else x)

+     # simulate the particles and the weights using likelihood weighting.
+     particles =
+       cpdist(bnCurrent, nodes = to, evidence = evidence, method = "lw", n = n)

+     # impute by posterior mode (discrete variables) or posterior expectation
+     # (continuous variables).
+     particles = sapply(particles, function(x, w) {

+       if (is.factor(x))
+         names(which.max(by(w, INDICES = x, FUN = sum)))
+       else if (is.numeric(x))
+         weighted.mean(x = x, w = w)

+     }, w = attr(particles, "weights"))

+     data_split[i, to] = particles

+   }#FOR

+   return(data_split)

+ }#IMPUTE_SPLIT
> 
> clusterExport(cl, "impute_split")

This is the main loop that iterates the expectation and maximisation steps of Structural EM, performing up to 10 steps before stopping. The two steps are as follows:

  1. Expectation step: missing values are imputed with impute_split() on each slave.
  2. Maximisation step: the structure of the network is learned with hc() in parallel on the data in each slave, and the resulting networks are averaged with averaged.network() to produce a consensus network for the whole data set. The parameters of this network are then learned with bn.fit().

The loop stops when the new candidate network dagNew is identical to the current network dagCurrent.

> for (iteration in seq(10)) {

+   # export the current network.
+   dagCurrent = dagNew
+   bnCurrent = bnNew
+   clusterExport(cl, c("dagCurrent", "bnCurrent"))

+   # expectation step: impute the missing data points on the data splits.
+   current = training

+   invisible(clusterEvalQ(cl, {
+     complete = impute_split()
+     NULL
+   }))

+   # maximization step: learn one network structure from each split and build
+   # a consensus network using model averaging.
+   models = parLapply(cl, seq(length(cl)), function(...) {

+     # starting from the previous network assumes that the new network will be
+     # similar, with few arc changes to evaluate.
+     hc(complete, blacklist = bl, start = dagCurrent)

+   })

+   # average the networks (and make sure the result is completely directed).
+   strengthNew = custom.strength(models, nodes = nodes(dagCurrent))
+   dagNew = averaged.network(strengthNew)
+   dagNew = cextend(dagNew)

+   # if there was no change, the network from the previous iteration is final.
+   if (isTRUE(all.equal(dagCurrent, dagNew)))
+     break

+   # retrieve the imputed values.
+   for (i in incompleteColumns) {

+    imputed = clusterCall(cl, function(col) complete[, col], col = i)
+    current[unlist(splitted), i] = unlist(imputed)

+   }#THEN

+   # fit the parameters.
+   bnNew = bn.fit(dagNew, data = current, keep.fitted = FALSE)

+   # save the DAG, the arc strengths and the fitted BN.
+   save(strengthNew, file = paste("strengthNew-", iteration, ".rda", sep = ""))
+   save(dagNew, file = paste("dagNew-", iteration, ".rda", sep = ""))
+   save(bnNew, file = paste("bnNew-", iteration, ".rda", sep = ""))
+   save(imputed, file = paste("imputed-", iteration, ".rda", sep = ""))

+ }#FOR
> 
> stopCluster(cl)
Last updated on Tue Jan 30 12:08:08 2018 with bnlearn 4.3 and R version 3.0.2 (2013-09-25).