--- title: "Streaming data into the demultiplexer" bibliography: ../inst/REFERENCES.bib author: - name: Jakob Peder Pettersen affiliation: - UiT, the Artic University of Norway, Department of Computer Science email: jakobpeder.pettersen@gmail.com output: BiocStyle::html_document: self_contained: yes toc: true toc_float: true toc_depth: 2 code_folding: show date: "`r doc_date()`" package: "`r pkg_ver('posDemux')`" vignette: > %\VignetteIndexEntry{Demultiplexing with streaming} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>", crop = NULL # Related to # https://stat.ethz.ch/pipermail/bioc-devel/2020-April/016656.html ) ``` ```{r vignetteSetup, echo=FALSE, message=FALSE, warning = FALSE} ## Bib setup library("RefManageR") ## Write bibliography information bib_packages <- c( R = citation(), BiocStyle = citation("BiocStyle")[1], knitr = citation("knitr")[1], RefManageR = citation("RefManageR")[1], rmarkdown = citation("rmarkdown")[1], sessioninfo = citation("sessioninfo")[1], testthat = citation("testthat")[1] ) bib <- ReadBib("../inst/REFERENCES.bib") # for (pkg in names(bib_packages)) { # bib[[pkg]] <- bib_packages[[pkg]] # } BibOptions( check.entries = FALSE, style = "markdown", cite.style = "numeric", bib.style = "numeric" ) ``` # Why streaming? Given that sequence files can consist of gigabytes of data, storing everything in memory at once can prove to be too much for the computer. Fortunately, for demultiplexing of barcodes, we can demultiplex the barcodes independently of each other. In theory, we could read the entries from a FASTQ file one by one, and for each of them call the demultiplexer and output the barcode assignment to a file. However, due to overhead in file operations and the `R` interpreter, the most efficient option is to process the files in chunks of a predetermined size. If you have not read the main package vignette yet (`vignette("posDemux")`), please do so before proceeding. ## Which chunk size to use? The chunk size should be large enough for the constant overhead of initiating a file read and interpreting the required `R` instructions to be neglectable. Still, the chunk size should be small enough to only make a modest memory footprint. In practice, with modern computers having gigabytes of RAM, we find a chunk size of $2\cdot10^5$ reads to be a reasonable choice. The example datasets used in this vignette are far too small to require streaming (including a dataset of realistic size would make the package exceed the maximum package size), so for purely demonstrational purposes, we choose a chunk size of $10^4$ reads. ## Overview of the streaming API The streaming API of `posDemux` is focused around two user-supplied functions: * The loader: This function is responsible for grabbing a chunk of the input file of the sequences and returning a `XStringSet` object of the sequences. Also, this function determines when to terminate the streaming, typically done when there are no sequences left. * The archiver: This function is responsible for writing the results of the demultiplexer of a chunk to file, typically in the form of a barcode table. Once the loader and archiver are specified and combined with the demultiplexer, they can generate the barcode table chunk by chunk. However, we still want to generate the summary statistics and frequency table as if all the reads were demultiplexed simultaneously. If the chunks are demultiplexed independently, `create_summary_res()` and `create_freq_table()` don't work as intended. That is where the core function of the streaming API; `streaming_demultiplex()` comes into play. It accepts the loader and the archiver as callback functions, communicates with the demultiplexer, and does the bookkeeping to construct the summary statistics and frequency table. Hence, for each chunk, the loader provides the sequences to demultiplex, the streaming logic feeds these reads into the demultiplexer, updates is record of observed barcodes and provides the results to the archiver which writes to the barcode table. This process is illustrated in Figure \@ref(fig:structure). For communicating between the loader and archiver, a user-defined state object is passed through `streaming_demultiplex()`. The initial value of this object is provided to `streaming_demultiplex()` as the argument `state_init`. ```{r structure, echo = FALSE, out.width = "100%", fig.cap = "Components of the streaming API"} knitr::include_graphics("streaming_structure.svg") ``` # Streaming using default callbacks Although the option of specifying the loader and archiver gives great flexibility, it does so at the cost of convenience as the loader and archiver can be challenging to write. For this reason, `posDemux` provides a default set of streaming callbacks available through `streaming_callbacks()`. Actually, this is a function factory, which returns a list of the following elements: * `state_init`: The initial state to provide to `streaming_demultiplex()` * `loader`: Loader function * `archiver`: Archiver function ## Assumptions The callbacks are designed to be useful for a typical use case where: * The reads are contained as a single FASTQ file on disk. * The barcode table contains columns with the read names (`read`), sequences of all payloads (eg.`UMI`) and the barcode assignments (eg. `BC3`, `BC2`, `BC1`). * Read names with spaces are truncated by the loader such that only the part of the read name before the space is kept. This is done because the information preceding the space is typically enough to ambiguously identify the read, while the information after the space usually contains irrelevant auxiliary information. * Progress of how many reads have been processed can optionally be displayed by the archiver for each chunk. If this does not fill your needs, please refer to the section [Streaming using custom callbacks]. ## Obtaining the callbacks As said, `streaming_callbacks()` is a function factory, we must specify the file path to the FASTQ input file, and the barcode table output file for it to generate useful callbacks. In addition, the chunk size is regulated through the callbacks. We use the same dataset as in the main package vignette (`vignette("demultiplexing")`) and obtain the callbacks as follows: ```{r default_callbacks} library(posDemux) input_fastq <- system.file("extdata", "PETRI-seq_forward_reads.fq.gz", package = "posDemux" ) output_barcode_table <- tempfile( pattern = "barcode_table", fileext = ".txt" ) default_callbacks <- streaming_callbacks( input_file = input_fastq, output_table_file = output_barcode_table, chunk_size = 1e+4, verbose = TRUE ) ``` ## Running the demultiplexing With the callbacks in place, we specify the barcodes, and the segments and feed them into the streaming framework: ```{r run_streaming} suppressPackageStartupMessages({ library(purrr) library(Biostrings) }) barcode_files <- system.file( "extdata/PETRI-seq_barcodes", c(bc1 = "bc1.fa", bc2 = "bc2.fa", bc3 = "bc3.fa"), package = "posDemux" ) names(barcode_files) <- paste0("bc", 1L:3L) barcode_index <- map(barcode_files, readDNAStringSet) barcodes <- barcode_index[c("bc3", "bc2", "bc1")] sequence_annotation <- c(UMI = "P", "B", "A", "B", "A", "B", "A") segment_lengths <- c(7L, 7L, 15L, 7L, 14L, 7L, NA_integer_) streaming_summary_res <- streaming_demultiplex( state_init = default_callbacks$state_init, loader = default_callbacks$loader, archiver = default_callbacks$archiver, barcodes = barcodes, allowed_mismatches = 1L, segments = sequence_annotation, segment_lengths = segment_lengths ) ``` ## Displaying the result from streaming From the return value of `streaming_demultiplex()`, we obtain the frequency table ```{r streaming_freq_table} head(streaming_summary_res$freq_table) ``` and summary printout: ```{r streaming_summary_res} streaming_summary_res$summary_res ``` # Streaming the barcode table ## Considerations about size of tables The frequency table from a realistic PETRI-seq dataset is relatively small (just a few megabytes) and should therefore not pose any substantial memory usage problems. On the other hand, the barcode table will have one row for each read processed, making its memory footprint closer to the size of the original FASTQ-file. To this end, the default archiver writes to the barcode table on disk for each chunk, thus avoiding keeping the table in memory. However, the barcode table will be required by some downstream step of the pipeline. Typically, this will be frequency filtering. The good news is that the cutoff determination process only needs the light frequency table. The bad news is that the full barcode table is required when determining which reads to keep after the cutoff is selected. If we want to avoid heavy load on RAM, we must stream the barcode table in chunks, determine for each chunk which reads to keep, and appending the results to another file. ## Subsetting the frequency table We begin this example by subsetting the frequency table. In order not to repeat ourselves, we refer to the main package vignette (`vignette("demultiplexing")`) for the considerations behind this process. ```{r freq_table_extract} freq_table <- streaming_summary_res$freq_table bc_cutoff <- 500L selected_freq_table <- freq_table[seq_len(bc_cutoff), ] ``` ## Writing to database We typically want to create a copy of the barcode table including only those reads which are selected by the barcode frequency and thus match a row in `selected_freq_table`. We **could** use the most direct approach and provide the selected barcode table in the same file format, just with desired rows kept. However, this could become impractical for downstream analysis as using this new table as a lookup would require us to load it all into memory, causing the same problem one level down. For memory-constrained environments, we therefore suggest another idea: Dumping the data into a SQLite database which later can be queried. For simplicity, we paste all the barcode designations together in one column. We begin by setting up the database: ```{r database_setup} suppressPackageStartupMessages({ library(DBI) }) output_database_path <- tempfile( pattern = "selected_barcode_table", fileext = ".sqlite" ) output_db <- dbConnect(RSQLite::SQLite(), output_database_path) ``` For doing the streaming of the barcode table and combining it with outputting to the database, we use the `chunked` package: ```{r database_write} chunk_size <- 1e4 suppressPackageStartupMessages({ library(chunked) library(magrittr) }) read_table_chunkwise( file = output_barcode_table, header = TRUE, sep = "\t", chunk_size = chunk_size ) %>% # For use within a pipeline, it is more convenient to use inner_join() # than posDemux::row_match() and it achieves the save result inner_join(selected_freq_table) %>% mutate( celltag = do.call( paste, c(barcodes %>% names() %>% all_of() %>% across(), sep = "_") ) ) %>% select(read, UMI, celltag) %>% write_chunkwise(dbplyr::src_dbi(output_db), "selected_barcodes") ``` Finally, we make a index on the barcodes such that lookups are fast and close the database connection: ```{r database_close} dbExecute(output_db, "CREATE INDEX idx ON selected_barcodes(read)") %>% invisible() dbDisconnect(output_db) ``` # Streaming using custom callbacks ## Desired properties In cases where the default callbacks are not sufficient, it is possible for the user to write custom callback functions. For our example, we will use the same sample dataset as before, but pretend the reads for some reason come in a FASTA file (which the default loader does not support). In addition, we want the UMIs to go into its own FASTA files instead of being a column in the barcode table. Unlike the default callbacks, we don't truncate read names. ## The state object For progress printouts, we want the state object of the streaming to contain the total number of reads and the number of reads where demultiplexing was successful. Additionally, for our specific case, the loader needs to know where to begin reading the FASTA file for each chunk. For the final numbers, this is not really necessary because the required information is provided by the streaming framework itself when finishing. Thus, we want the state object to contain the following fields which are both integers: * `total_reads` * `demultiplexed_reads` `total_reads` will be updated by the loader, whereas `demultiplexed_reads` will be updated by the archiver at the end of each chunk. Furthermore, the archiver needs to write the column headers to the barcode table when first writing to it, so it must know whether it is working at its first chunk. This gives rise to the logical flag `output_table_initialized`. We thus initialize the state as follows: ```{r state_init} custom_state_init <- list( total_reads = 0L, demultiplexed_reads = 0L, output_table_initialized = FALSE ) ``` ## The loader The loader is a function which accepts the state as its sole parameter and outputs a list with three fields: * `state`: To be passed into the archiver. * `sequences`: The sequences in the chunk as a `Biostrings::DNAStringSet` object. * `should_terminate`: A logical flag to signal to the streaming framework that it should terminate. Note that this also means the loader is in charge of the termination criterion. We decide to hard-code the loader to parse $10^4$ reads for each chunk. ```{r loader} input_fasta <- system.file("extdata", "PETRI-seq_forward_reads.fa.gz", package = "posDemux" ) loader <- function(state) { n_reads_per_chunk <- 1e4 if (!state$output_table_initialized) { message("Starting to load sequences") } chunk <- Biostrings::readDNAStringSet( filepath = input_fasta, format = "fasta", nrec = n_reads_per_chunk, skip = state$total_reads ) n_reads_in_chunk <- length(chunk) if (n_reads_in_chunk == 0L && state$output_table_initialized) { # The case when the initial chunk is empty is given # special treatment since # we want to create the table regardless # No more reads to process, make the outer framework return final_res <- list( state = state, sequences = NULL, should_terminate = TRUE ) message("Done demultiplexing") return(final_res) } state$total_reads <- state$total_reads + n_reads_in_chunk list( state = state, sequences = chunk, should_terminate = FALSE ) } ``` ## The archiver The archiver has the state obtained from the loader as its first argument and the results from `filter_demultiplex_res()` of the chunk as its second argument. Its return value is the state to provide to the loader when it continues with its next chunk. ```{r archiver} output_umi_file <- tempfile(pattern = "umi.fa", fileext = ".txt") custom_output_table <- tempfile( pattern = "barcode_table_custom", fileext = ".txt" ) archiver <- function(state, filtered_res) { barcode_matrix <- filtered_res$demultiplex_res$assigned_barcodes barcode_names <- colnames(barcode_matrix) read_names <- rownames(barcode_matrix) # If the table has no rows, we may risk getting a NULL value if (is.null(read_names)) { read_names <- character() } barcode_table <- as.data.frame(barcode_matrix) read_name_table <- data.frame(read = read_names) umis <- filtered_res$demultiplex_res$payload$UMI chunk_table <- cbind(read_name_table, barcode_table) if (!state$output_table_initialized) { append <- FALSE state$output_table_initialized <- TRUE } else { append <- TRUE } Biostrings::writeXStringSet( umis, filepath = output_umi_file, append = TRUE ) readr::write_tsv( x = chunk_table, file = custom_output_table, append = append, col_names = !append, eol = "\n" ) state <- within(state, { demultiplexed_reads <- demultiplexed_reads + nrow(barcode_matrix) paste0( "Processed {total_reads} reads,", " ", "successfully demultiplexed {demultiplexed_reads} reads so far..." ) %>% glue::glue() %>% message() }) state } ``` ## Putting it all together With the initial state and the callbacks being defined, we are now ready to put them to use: ```{r combining_custom_callbacks} custom_summary_res <- streaming_demultiplex( state_init = custom_state_init, loader = loader, archiver = archiver, barcodes = barcodes, allowed_mismatches = 1L, segments = sequence_annotation, segment_lengths = segment_lengths ) ``` ## Verifying results We finally load the files we wrote to in order to verify that they contain what we desire. We load the barcode table: ```{r verify_barcode_table} readr::read_tsv(custom_output_table, show_col_types = FALSE) ``` and the UMI sequences: ```{r verify_umi} Biostrings::readDNAStringSet(output_umi_file, format = "fasta") ``` # Reproducibility The `r Biocpkg("posDemux")` package `r Citep(bib[["posDemux"]])` was made possible thanks to: - R `r Citep(bib[["R"]])` - `r Biocpkg("BiocStyle")` `r Citep(bib[["BiocStyle"]])` - `r CRANpkg("knitr")` `r Citep(bib[["knitr"]])` - `r CRANpkg("RefManageR")` `r Citep(bib[["RefManageR"]])` - `r CRANpkg("rmarkdown")` `r Citep(bib[["rmarkdown"]])` - `r CRANpkg("sessioninfo")` `r Citep(bib[["sessioninfo"]])` - `r CRANpkg("testthat")` `r Citep(bib[["testthat"]])` This package was developed using `r BiocStyle::Biocpkg("biocthis")`. `R` session information: ```{r reproduce3, echo=FALSE} ## Session info library("sessioninfo") options(width = 120) session_info() ``` # Bibliography This vignette was generated using `r Biocpkg("BiocStyle")` `r Citep(bib[["BiocStyle"]])` with `r CRANpkg("knitr")` `r Citep(bib[["knitr"]])` and `r CRANpkg("rmarkdown")` `r Citep(bib[["rmarkdown"]])` running behind the scenes.