## ----setup, include = FALSE------------------------------------------------------------------------------------------- knitr::opts_chunk$set( collapse = TRUE, comment = "#>", crop = NULL # Related to # https://stat.ethz.ch/pipermail/bioc-devel/2020-April/016656.html ) ## ----vignetteSetup, echo=FALSE, message=FALSE, warning = FALSE-------------------------------------------------------- ## Bib setup library("RefManageR") ## Write bibliography information bib_packages <- c( R = citation(), BiocStyle = citation("BiocStyle")[1], knitr = citation("knitr")[1], RefManageR = citation("RefManageR")[1], rmarkdown = citation("rmarkdown")[1], sessioninfo = citation("sessioninfo")[1], testthat = citation("testthat")[1] ) bib <- ReadBib("../inst/REFERENCES.bib") # for (pkg in names(bib_packages)) { # bib[[pkg]] <- bib_packages[[pkg]] # } BibOptions( check.entries = FALSE, style = "markdown", cite.style = "numeric", bib.style = "numeric" ) ## ----structure, echo = FALSE, out.width = "100%", fig.cap = "Components of the streaming API"------------------------- knitr::include_graphics("streaming_structure.svg") ## ----default_callbacks------------------------------------------------------------------------------------------------ library(posDemux) input_fastq <- system.file("extdata", "PETRI-seq_forward_reads.fq.gz", package = "posDemux" ) output_barcode_table <- tempfile( pattern = "barcode_table", fileext = ".txt" ) default_callbacks <- streaming_callbacks( input_file = input_fastq, output_table_file = output_barcode_table, chunk_size = 1e+4, verbose = TRUE ) ## ----run_streaming---------------------------------------------------------------------------------------------------- suppressPackageStartupMessages({ library(purrr) library(Biostrings) }) barcode_files <- system.file( "extdata/PETRI-seq_barcodes", c(bc1 = "bc1.fa", bc2 = "bc2.fa", bc3 = "bc3.fa"), package = "posDemux" ) names(barcode_files) <- paste0("bc", 1L:3L) barcode_index <- map(barcode_files, readDNAStringSet) barcodes <- barcode_index[c("bc3", "bc2", "bc1")] sequence_annotation <- c(UMI = "P", "B", "A", "B", "A", "B", "A") segment_lengths <- c(7L, 7L, 15L, 7L, 14L, 7L, NA_integer_) streaming_summary_res <- streaming_demultiplex( state_init = default_callbacks$state_init, loader = default_callbacks$loader, archiver = default_callbacks$archiver, barcodes = barcodes, allowed_mismatches = 1L, segments = sequence_annotation, segment_lengths = segment_lengths ) ## ----streaming_freq_table--------------------------------------------------------------------------------------------- head(streaming_summary_res$freq_table) ## ----streaming_summary_res-------------------------------------------------------------------------------------------- streaming_summary_res$summary_res ## ----freq_table_extract----------------------------------------------------------------------------------------------- freq_table <- streaming_summary_res$freq_table bc_cutoff <- 500L selected_freq_table <- freq_table[seq_len(bc_cutoff), ] ## ----database_setup--------------------------------------------------------------------------------------------------- suppressPackageStartupMessages({ library(DBI) }) output_database_path <- tempfile( pattern = "selected_barcode_table", fileext = ".sqlite" ) output_db <- dbConnect(RSQLite::SQLite(), output_database_path) ## ----database_write--------------------------------------------------------------------------------------------------- chunk_size <- 1e4 suppressPackageStartupMessages({ library(chunked) library(magrittr) }) read_table_chunkwise( file = output_barcode_table, header = TRUE, sep = "\t", chunk_size = chunk_size ) %>% # For use within a pipeline, it is more convenient to use inner_join() # than posDemux::row_match() and it achieves the save result inner_join(selected_freq_table) %>% mutate( celltag = do.call( paste, c(barcodes %>% names() %>% all_of() %>% across(), sep = "_") ) ) %>% select(read, UMI, celltag) %>% write_chunkwise(dbplyr::src_dbi(output_db), "selected_barcodes") ## ----database_close--------------------------------------------------------------------------------------------------- dbExecute(output_db, "CREATE INDEX idx ON selected_barcodes(read)") %>% invisible() dbDisconnect(output_db) ## ----state_init------------------------------------------------------------------------------------------------------- custom_state_init <- list( total_reads = 0L, demultiplexed_reads = 0L, output_table_initialized = FALSE ) ## ----loader----------------------------------------------------------------------------------------------------------- input_fasta <- system.file("extdata", "PETRI-seq_forward_reads.fa.gz", package = "posDemux" ) loader <- function(state) { n_reads_per_chunk <- 1e4 if (!state$output_table_initialized) { message("Starting to load sequences") } chunk <- Biostrings::readDNAStringSet( filepath = input_fasta, format = "fasta", nrec = n_reads_per_chunk, skip = state$total_reads ) n_reads_in_chunk <- length(chunk) if (n_reads_in_chunk == 0L && state$output_table_initialized) { # The case when the initial chunk is empty is given # special treatment since # we want to create the table regardless # No more reads to process, make the outer framework return final_res <- list( state = state, sequences = NULL, should_terminate = TRUE ) message("Done demultiplexing") return(final_res) } state$total_reads <- state$total_reads + n_reads_in_chunk list( state = state, sequences = chunk, should_terminate = FALSE ) } ## ----archiver--------------------------------------------------------------------------------------------------------- output_umi_file <- tempfile(pattern = "umi.fa", fileext = ".txt") custom_output_table <- tempfile( pattern = "barcode_table_custom", fileext = ".txt" ) archiver <- function(state, filtered_res) { barcode_matrix <- filtered_res$demultiplex_res$assigned_barcodes barcode_names <- colnames(barcode_matrix) read_names <- rownames(barcode_matrix) # If the table has no rows, we may risk getting a NULL value if (is.null(read_names)) { read_names <- character() } barcode_table <- as.data.frame(barcode_matrix) read_name_table <- data.frame(read = read_names) umis <- filtered_res$demultiplex_res$payload$UMI chunk_table <- cbind(read_name_table, barcode_table) if (!state$output_table_initialized) { append <- FALSE state$output_table_initialized <- TRUE } else { append <- TRUE } Biostrings::writeXStringSet( umis, filepath = output_umi_file, append = TRUE ) readr::write_tsv( x = chunk_table, file = custom_output_table, append = append, col_names = !append, eol = "\n" ) state <- within(state, { demultiplexed_reads <- demultiplexed_reads + nrow(barcode_matrix) paste0( "Processed {total_reads} reads,", " ", "successfully demultiplexed {demultiplexed_reads} reads so far..." ) %>% glue::glue() %>% message() }) state } ## ----combining_custom_callbacks--------------------------------------------------------------------------------------- custom_summary_res <- streaming_demultiplex( state_init = custom_state_init, loader = loader, archiver = archiver, barcodes = barcodes, allowed_mismatches = 1L, segments = sequence_annotation, segment_lengths = segment_lengths ) ## ----verify_barcode_table--------------------------------------------------------------------------------------------- readr::read_tsv(custom_output_table, show_col_types = FALSE) ## ----verify_umi------------------------------------------------------------------------------------------------------- Biostrings::readDNAStringSet(output_umi_file, format = "fasta") ## ----reproduce3, echo=FALSE------------------------------------------------------------------------------------------- ## Session info library("sessioninfo") options(width = 120) session_info()