Skip to content

Commit

Permalink
nssp pipeline code (#1952)
Browse files Browse the repository at this point in the history
* to make nssp run in staging

* add nssp to Jenkinsfile

* nssp_token name change

* et code

* Update nssp/delphi_nssp/run.py

Co-authored-by: David Weber <[email protected]>

* Update nssp/README.md

Co-authored-by: David Weber <[email protected]>

* Update nssp/DETAILS.md

Co-authored-by: David Weber <[email protected]>

* Update nssp/delphi_nssp/__main__.py

Co-authored-by: nmdefries <[email protected]>

* Update nssp/delphi_nssp/pull.py

Co-authored-by: nmdefries <[email protected]>

* Update nssp/delphi_nssp/run.py

Co-authored-by: nmdefries <[email protected]>

* readme update

* column names mapping + signals name standardization to fit with other available sources and signals"

* improve readability

* Add type_dict constant

* more type_dict

* add more unit test pull

* data for unit test of pull

* hrr + msa geos

* use enumerate for clarity

* set nssp sircal max_age to 13 days

* set nssp sircal max_age to 15 days, to account for nighttime run

* add validation to params

* Update nssp/DETAILS.md

Co-authored-by: nmdefries <[email protected]>

* Update nssp/delphi_nssp/constants.py

Co-authored-by: nmdefries <[email protected]>

* et code

* Update nssp/delphi_nssp/run.py

Co-authored-by: David Weber <[email protected]>

* Update nssp/README.md

Co-authored-by: David Weber <[email protected]>

* Update nssp/DETAILS.md

Co-authored-by: David Weber <[email protected]>

* Update nssp/delphi_nssp/__main__.py

Co-authored-by: nmdefries <[email protected]>

* Update nssp/delphi_nssp/pull.py

Co-authored-by: nmdefries <[email protected]>

* Update nssp/delphi_nssp/run.py

Co-authored-by: nmdefries <[email protected]>

* readme update

* column names mapping + signals name standardization to fit with other available sources and signals"

* improve readability

* Add type_dict constant

* more type_dict

* add more unit test pull

* data for unit test of pull

* hrr + msa geos

* use enumerate for clarity

* to make nssp run in staging

* add nssp to Jenkinsfile

* nssp_token name change

* set nssp sircal max_age to 15 days, to account for nighttime run

* set nssp sircal max_age to 13 days

* add validation to params

* Update nssp/DETAILS.md

Co-authored-by: nmdefries <[email protected]>

* Update nssp/delphi_nssp/constants.py

Co-authored-by: nmdefries <[email protected]>

* nssp correlation rmd and general notebook folder

* making Black happy

* update to new geomapper function

* following 120 line convention everywhere

* happy linter

* happy black formatter in nssp

* drop unneeded nssp tests

* updates borked old tests, caught by @dshemetov

* rebase woes and version consistency

* Update nssp-params-prod.json.j2 min/max lag to 13

* Update params.json.template min/max lag to 7 and 13

* missed column renames for geo_mapper, unneeded index

* lint+fix: update from linter changes
* add make format to nssp Makefile
* run make format on nssp

* ci: update ci to lint nssp

* lint: linter happy

* lint: pydocstyle happy

* lint: pydocstyle happy

* pct_visits to pct_ed_visits

---------

Co-authored-by: David Weber <[email protected]>
Co-authored-by: nmdefries <[email protected]>
Co-authored-by: Dmitry Shemetov <[email protected]>
  • Loading branch information
4 people authored Jun 10, 2024
1 parent 38ea2da commit ae6f011
Show file tree
Hide file tree
Showing 37 changed files with 3,589 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ jobs:
dir: "delphi_hhs"
- package: "nchs_mortality"
dir: "delphi_nchs_mortality"
- package: "nssp"
dir: "delphi_nssp"
- package: "nwss_wastewater"
dir: "delphi_nwss"
- package: "quidel_covidtest"
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
- TODO: #527 Get this list automatically from python-ci.yml at runtime.
*/

def indicator_list = ["backfill_corrections", "changehc", "claims_hosp", "google_symptoms", "hhs_hosp", "nchs_mortality", "quidel_covidtest", "sir_complainsalot", "doctor_visits", "nwss_wastewater"]
def indicator_list = ["backfill_corrections", "changehc", "claims_hosp", "google_symptoms", "hhs_hosp", "nchs_mortality", "quidel_covidtest", "sir_complainsalot", "doctor_visits", "nwss_wastewater", "nssp"]
def build_package_main = [:]
def build_package_prod = [:]
def deploy_staging = [:]
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/tests/test_weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@ def test_calc_adjustment(self):
# The date and "den" column are unchanged by this function
assert np.allclose(result["num"].values, expected_nums)
assert np.allclose(result["den"].values, self.TEST_DATA["den"].values)
assert np.array_equal(result["date"].values, self.TEST_DATA["date"].values)
assert np.array_equal(result["date"].values, self.TEST_DATA["date"].values)
29 changes: 29 additions & 0 deletions ansible/templates/nssp-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"common": {
"export_dir": "/common/covidcast/receiving/nssp",
"log_filename": "/var/log/indicators/nssp.log",
"log_exceptions": false
},
"indicator": {
"wip_signal": true,
"static_file_dir": "./static",
"socrata_token": "{{ nssp_token }}"
},
"validation": {
"common": {
"data_source": "nssp",
"api_credentials": "{{ validation_api_key }}",
"span_length": 15,
"min_expected_lag": {"all": "7"},
"max_expected_lag": {"all": "13"},
"dry_run": true,
"suppressed_errors": []
},
"static": {
"minimum_sample_size": 0,
"missing_se_allowed": true,
"missing_sample_size_allowed": true
},
"dynamic": {}
}
}
4 changes: 4 additions & 0 deletions ansible/templates/sir_complainsalot-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
"hhs": {
"max_age":15,
"maintainers": []
},
"nssp": {
"max_age":13,
"maintainers": []
}
}
}
3 changes: 3 additions & 0 deletions ansible/vars.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ nchs_mortality_token: "{{ vault_cdc_socrata_token }}"
# NWSS
nwss_wastewater_token: "{{ vault_cdc_socrata_token }}"

# nssp
nssp_token: "{{ vault_cdc_socrata_token }}"

# SirCAL
sir_complainsalot_api_key: "{{ vault_sir_complainsalot_api_key }}"
sir_complainsalot_slack_token: "{{ vault_sir_complainsalot_slack_token }}"
Expand Down
1 change: 1 addition & 0 deletions notebooks/.Rprofile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
source("renv/activate.R")
257 changes: 257 additions & 0 deletions notebooks/nssp/cor_dashboard.Rmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
---
title: "Correlation Analyses for COVID-19 Indicators"
author: "Delphi Group"
date: "`r format(Sys.time(), '%B %d, %Y')`"
output:
html_document:
code_folding: hide
---

```{r, include = FALSE}
knitr::opts_chunk$set(message = FALSE, warning = FALSE, fig.width = 8,
fig.height = 7)
```

### Getting data
This requires that you've already run the nssp pipeline. See the `nssp` directory for instructions on doing that.
First loading some libraries and reading the results from the pipeline:
```{r}
library(covidcast)
library(epidatr)
library(dplyr)
library(ggplot2)
library(purrr)
library(tidyverse)
library(dplyr)
library(readr)
files <- list.files(here::here("nssp/receiving"), pattern="\\.csv$", full.names = TRUE)
read_row <- function(filename) {
split_name <- filename %>%
tools::file_path_sans_ext() %>%
strsplit("/") %>% `[[`(1) %>% tail(n=1) %>%
strsplit("_") %>% `[[`(1)
week_number <- split_name[[2]]
geo_type <- split_name[[3]]
col_name <- split_name[-(1:3)] %>% paste(collapse = "_")
read_csv(filename, show_col_types = FALSE) %>%
as_tibble %>%
mutate(signal = col_name,
geo_type = geo_type,
week_number = week_number) %>%
mutate(across(geo_id, factor)) %>%
rename(geo_value = geo_id, time_value = week_number) %>%
select(-missing_se, -se, -sample_size, -missing_sample_size) %>%
return
}
res <- map(files, read_row)
nssp_data <- bind_rows(res)
nssp_state <- nssp_data %>%
filter(geo_type == "state") %>%
mutate(time_value = epidatr:::parse_api_week(time_value)) %>%
as_epi_df(time_type = "week", geo_type = "state") %>%
select(-missing_val, -geo_type)
unique(nssp_data$time_value)
```
And epidatr versions of hhs for comparison
```{r}
library(epidatr)
eval_time <- epidatr::epirange(from = "2020-01-01", to = Sys.Date())
fetch_args <- epidatr::fetch_args_list(return_empty = TRUE, timeout_seconds = 300)
flu_hhs <- epidatr::pub_covidcast(
source = "hhs",
signals = "confirmed_admissions_influenza_1d_prop_7dav",
geo_type = "state",
time_type = "day",
geo_values = "*",
time_values = eval_time,
fetch_args = fetch_args
) %>%
select(-signal, -source, - time_type)
covid_hhs <- epidatr::pub_covidcast(
source = "hhs",
signals = "confirmed_admissions_covid_1d_prop_7dav",
geo_type = "state",
time_type = "day",
geo_values = "*",
time_values = eval_time,
fetch_args = fetch_args
) %>%
select(-signal, -source, - time_type)
nchs <- epidatr::pub_covidcast(
source = "nchs-mortality",
signals = "deaths_allcause_incidence_num",
geo_type = "state",
time_type = "week",
geo_values = "*",
time_values = epidatr::epirange(from = "202001", to = "202418"),
fetch_args = epidatr::fetch_args_list(return_empty = TRUE, timeout_seconds = 300)
)
```
# Flu
```{r}
library(epiprocess)
nssp_flu_state <- nssp_state %>% filter(signal == "pct_ed_visits_influenza") %>% select(-signal) %>% drop_na %>% rename(pct_flu_visits = val) %>% as_epi_df(time_type = "week", geo_type = "state")
week_starts <- nssp_flu_state$time_value %>% unique()
flu_hhs_weekly <- flu_hhs %>% select(geo_value, time_value, value) %>% filter(time_value %in% week_starts) %>% rename(conf_admission = value) %>% drop_na %>% as_epi_df(time_type = "week", geo_type = "state")
joined <- nssp_flu_state %>% left_join(flu_hhs_weekly)
```

After the necessary joining, lets look at the average correlations
```{r}
cor(joined$pct_flu_visits, joined$conf_admission, method = "spearman")
```
So the overall correlation is pretty high.

## Correlations sliced by state
```{r}
correlations_space_flu <- epi_cor(joined, pct_flu_visits, conf_admission, cor_by = "geo_value", use = "complete.obs", method = "spearman")
library(maps) # For map data
states_map <- map_data("state")
mapped <- states_map %>% as_tibble %>% mutate(geo_value = setNames(tolower(state.abb), tolower(state.name))[region]) %>% right_join(correlations_space_flu) %>% arrange(group, order)
library(viridis)
ggplot(mapped, aes(x = long, y = lat, group = group, fill = cor)) +
geom_polygon(colour = "black") +
scale_fill_viridis(discrete=FALSE, option="viridis", limits = c(0,1)) +
coord_map("polyconic") +
labs(title = "Spearman Correlations between Flu ER visits and Flu hospital admissions")
ggsave("flu_ER_admissions_state_correlations.pdf")
```
Over space, hospital admissions look like they're highly correlated with ER visits (which makes sense, frequently when one is admitted it is via the ER).
The lowest overall correlation is
```{r}
correlations_space_flu %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
```
### Lag evaluation
```{r}
library(purrr)
lags <- 0:35
lagged_flu_state <- map_dfr(lags, function(lag) {
epi_cor(joined, pct_flu_visits, conf_admission, cor_by = geo_value, dt1 = lag, use = "complete.obs", method = "spearman") %>%
mutate(lag = .env$lag)
})
lagged_flu_state %>%
group_by(lag) %>%
summarize(mean = mean(cor, na.rm = TRUE)) %>%
ggplot(aes(x = lag, y = mean)) +
geom_line() +
geom_point() +
labs(x = "Lag", y = "Mean correlation", title = "Lag comparison for state spearman correlations for flu ER and Hosp admissions")
ggsave("flu_ER_admissions_state_lag_cor.pdf")
```
Somewhat unsurprisingly, the correlation is highest immediately afterward.
## Correlations sliced by time
```{r}
correlations_time_flu <- epi_cor(joined, pct_flu_visits, conf_admission, cor_by = "time_value", use = "complete.obs", method = "spearman")
correlations_time_flu
ggplot(correlations_time_flu, aes(x = time_value, y = cor)) + geom_line() + lims(y=c(0,1)) + labs(title = "Spearman Correlations between Flu ER visits and Flu hospital admissions")
ggsave("flu_ER_admissions_time_correlations.pdf")
```
Strangely, sliced by time, we get significantly lower correlations
```{r}
correlations_time_flu %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
```
Seems like we have a Simpson's paradox adjacent result, since for any given location the signals are fairly well correlated when averaged over time, but at a given time, averaging over different locations suggests they're not very well correlated.
If the typical explanation applies, this means that there are large differences in the number of points.

so, getting the counts:
```{r}
joined %>% group_by(geo_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
```
Each location has 82

```{r}
joined %>% group_by(time_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
```
# Covid
```{r}
library(epiprocess)
nssp_data %>% pull(signal) %>% unique
nssp_state <- nssp_data %>%
filter(geo_type == "state") %>%
mutate(time_value = epidatr:::parse_api_week(time_value)) %>%
as_epi_df(time_type = "week", geo_type = "state") %>%
select(-missing_val, -geo_type)
nssp_covid_state <- nssp_state %>% filter(signal == "pct_ed_visits_covid") %>% select(-signal) %>% drop_na %>% rename(pct_covid_visits = val) %>% as_epi_df(time_type = "week", geo_type = "state")
week_starts <- nssp_covid_state$time_value %>% unique()
covid_hhs_weekly <- covid_hhs %>% select(geo_value, time_value, value) %>% filter(time_value %in% week_starts) %>% rename(conf_admission = value) %>% drop_na %>% as_epi_df(time_type = "week", geo_type = "state")
joined_covid <- nssp_covid_state %>% left_join(covid_hhs_weekly)
```

After the necessary joining, lets look at the average correlations
```{r}
cor(joined_covid$pct_covid_visits, joined_covid$conf_admission, method = "spearman")
```
So the overall correlation is pretty high, but lower than flu.

## Correlations sliced by state
```{r}
correlations_space_covid <- epi_cor(joined_covid, pct_covid_visits, conf_admission, cor_by = "geo_value", use = "complete.obs", method = "spearman")
library(maps) # For map data
states_map <- map_data("state")
mapped <- states_map %>% as_tibble %>% mutate(geo_value = setNames(tolower(state.abb), tolower(state.name))[region]) %>% right_join(correlations_space_covid) %>% arrange(group, order)
library(viridis)
ggplot(mapped, aes(x = long, y = lat, group = group, fill = cor)) +
geom_polygon(colour = "black") +
scale_fill_viridis(discrete=FALSE, option="viridis", limits = c(0,1)) +
coord_map("polyconic") +
labs(title = "Spearman Correlations between covid ER visits and covid hospital admissions")
ggsave("covid_ER_admissions_state_correlations.pdf")
ggsave("covid_ER_admissions_state_correlations.png")
```
Over space, hospital admissions look like they're highly correlated with ER visits (which makes sense, frequently when one is admitted it is via the ER).
The lowest overall correlation is
```{r}
correlations_space_covid %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
```
### Lag evaluation
```{r}
library(purrr)
lags <- 0:35
lagged_covid_state <- map_dfr(lags, function(lag) {
epi_cor(joined_covid, pct_covid_visits, conf_admission, cor_by = geo_value, dt1 = -lag, use = "complete.obs", method = "spearman") %>%
mutate(lag = .env$lag)
})
lagged_covid_state %>%
group_by(lag) %>%
summarize(mean = mean(cor, na.rm = TRUE)) %>%
ggplot(aes(x = lag, y = mean)) +
geom_line() +
geom_point() +
labs(x = "Lag", y = "Mean correlation", title = "Lag comparison for state spearman correlations for covid ER and Hosp admissions")
ggsave("covid_ER_admissions_state_lag_cor.pdf")
ggsave("covid_ER_admissions_state_lag_cor.png")
```
Somewhat unsurprisingly, the correlation is highest immediately afterward, though its significantly lower than in the flu case.
## Correlations sliced by time
```{r}
correlations_time_covid <- epi_cor(joined_covid, pct_covid_visits, conf_admission, cor_by = "time_value", use = "complete.obs", method = "spearman")
correlations_time_covid
ggplot(correlations_time_covid, aes(x = time_value, y = cor)) + geom_line() + lims(y=c(0,1)) + labs(title = "Spearman Correlations between covid ER visits and covid hospital admissions")
ggsave("covid_ER_admissions_time_correlations.pdf")
ggsave("covid_ER_admissions_time_correlations.png")
```
Strangely, sliced by time, we get significantly lower correlations, some of them are even negative
```{r}
correlations_time_covid %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
```
Seems like we have a Simpson's paradox adjacent result, since for any given location the signals are fairly well correlated when averaged over time, but at a given time, averaging over different locations suggests they're not very well correlated.
If the typical explanation applies, this means that there are large differences in the number of points.

so, getting the counts:
```{r}
joined_covid %>% group_by(geo_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
```
Each location has 82

```{r}
joined_covid %>% group_by(time_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
```
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit ae6f011

Please sign in to comment.