---
title: "Automation and Visualization of Flow Cytometry Data Analysis Pipelines"
author:
    - name: Philippe Hauchamps
    - name: Laurent Gatto
package: CytoPipeline
abstract: >
 This vignette describes the functionality implemented in the `CytoPipeline`
 package. `CytoPipeline` provides support for automation and visualization of 
 flow cytometry data analysis pipelines. In the current state, the package 
 focuses on the pre-processing and quality control part. 
 This vignette is distributed under a CC BY-SA 4.0 license.
output:
  BiocStyle::html_document:
    toc_float: true
bibliography: CytoPipeline.bib
vignette: >
  %\VignetteIndexEntry{Automation and Visualization of Flow Cytometry Data Analysis Pipelines}
  %\VignetteEngine{knitr::rmarkdown}
  %%\VignetteKeywords{FlowCytometry, Preprocessing, QualityControl, WorkflowStep, ImmunoOncology, Software, Visualization}
  %\VignetteEncoding{UTF-8}
---

```{r style, echo = FALSE, results = 'asis'}
BiocStyle::markdown()
```
# Installation

To install this package, start R and enter (uncommented):

```{r}
# if (!require("BiocManager", quietly = TRUE))
#     install.packages("BiocManager")
# 
# BiocManager::install("CytoPipeline")
```

Note that CytoPipeline imports ggplot2 (>= 3.4.1).  
The version requirement is due to a bug in version 3.4.0., 
affecting `ggplot2::geom_hex()`.

# Introduction

The `CytoPipeline` package provides infrastructure to support the definition, 
run and standardized visualization of pre-processing and quality control 
pipelines for flow cytometry data. This infrastructure consists of two main S4 
classes, i.e. `CytoPipeline` and `CytoProcessingStep`, as well as dedicated 
wrapper functions around selected third-party package methods often used to 
implement these pre-processing steps.

In the following sections, we demonstrate how to create a `CytoPipeline` 
object implementing a simple pre-processing pipeline, how to run it and 
how to retrieve and visualize the results after each step. 

# Example dataset
The example dataset that will be used throughout this vignette is derived from 
a reference public dataset accompanying the OMIP-021 (Optimized Multicolor 
Immunofluorescence Panel 021) article [@Gherardin2014-pj].  

A sub-sample of this public dataset is built-in in the `CytoPipeline` 
package, as the OMIP021 dataset. 
See the `MakeOMIP021Samples.R` script for more details 
on how the `OMIP021` dataset was created. This script is to be found 
in the `script` subdirectory in the `CytoPipeline` package installation path.

Note that in the `CytoPipeline`package, as in the current vignette, 
matrices of flow cytometry events intensities are stored as 
`flowCore::flowFrame` objects [@flowCore]. 

# Example of pre-processing and QC pipelines
Let's assume that we want to pre-process the two samples of the `OMIP021` 
dataset, and let's assume that we want to compare what we would obtain when
pre-processing these files using two different QC methods.    

In the first pre-processing pipeline, we will use the flowAI QC method 
[@Monaco2016-vo], while in the second pipeline, we will use the PeacoQC method 
[@Emmaneel2021-xy]. Note that when we here refer to QC method, we mean the 
algorithm used to ensure stability (stationarity) of the channel signals 
in time.

In both pipelines, the first part consists in estimating appropriate scale
transformation functions for all channels present in the sample `flowFrame`.
In order to do this, we propose the following *scale transformation processing 
queue* (Fig. 1):   

- reading the two samples `.fcs` files
- removing the margin events from each file
- applying compensation for each file
- aggregating and sub-sampling from each file
- estimating the scale transformations from the aggregated 
and sub-sampled data   

```{r scaleTransformQueueDisplay, results='markup', fig.cap="Scale transform processing queue", echo=FALSE, out.width='75%', fig.align='center', fig.wide = TRUE}
knitr::include_graphics("figs/scaleTransformQueue.png", error = FALSE)
```

When this first part is done, one can apply pre-processing for each file 
one by one. However, depending on the choice of QC method, the order of steps 
needs to be slightly different:

- when using flowAI, it is advised to eliminate the 'bad events' starting from
raw data (see [@Monaco2016-vo])
- when using PeacoQC, it is advised to eliminate the 'bad events' from already
compensated and scale transformed data (see [@Emmaneel2021-xy])

Therefore, we propose the following *pre-processing queues* represented in 
Fig. 2.

```{r preProcessingQueueDisplay, results='markup', fig.cap="Pre-processing queue for two different pipeline settings", echo=FALSE, out.width='100%', fig.align='center', fig.wide = TRUE}
knitr::include_graphics("figs/preProcessingQueues.png", error = FALSE)
```

# Building the CytoPipeline

`CytoPipeline` is the central S4 class used in the `CytoPipeline` package to 
represent a flow cytometry pre-processing pipeline. The main slots of
`CytoPipeline` objects are :    

- an `experimentName`, which gives a name to a particular user definition 
of a pre-processing pipeline. The *experiment* here, is not related to an assay
experiment, but refers to a specific way to design a pipeline. For example, in
the current use case, we will define two `experimentName`s, one to refer to the
flowAI pipeline, and another one to refer to the PeacoQC pipeline 
(see previous section);   

- a vector of `sampleFiles`, which are `.fcs` raw data files on which one need 
to run the pre-processing pipeline;  

- two processing queues, i.e. a `scaleTransformProcessingQueue`, and a 
`flowFramesPreProcessingQueue`, which correspond to the two parts described in 
previous section. Each of these queues are composed of one or several 
`CytoProcessingStep` objects, will be processed in linear sequence, the output 
of one step being the input of the next step.   

Note there are important differences between the two processing queues. 
On the one hand, the `scaleTransformProcessingQueue` takes the vector of all 
sample files as an input, and will be executed first, and only once. 
On the other hand, the `flowFramesPreProcessingQueue` will be run after the 
scale transformation processing queue, on each sample file one after the other, 
within a loop. The final output of the `scaleTransformProcessingQueue`, which 
should be a `flowCore::tranformList`, is also provided as input to the 
`flowFramesPreProcessingQueue`, by convention.      

In the next subsections, we show the different steps involved in creating a
`CytoPipeline` object.

## preliminaries: paths definition

In the following code, `rawDataDir` refers to the directory in which the `.fcs`
raw data files are stored. `workDir` will be used as root directory to store 
the disk cache. Indeed, when running the `CytoPipeline` objects, all the 
different step outputs will be stored in a `BiocFileCache` instance, in a 
sub-directory that will be created in `workDir`and of which the name will be 
set to the pipeline `experimentName`.

```{r pathsDef}
library(CytoPipeline)

# raw data
rawDataDir <- system.file("extdata", package = "CytoPipeline")
# output files
workDir <- suppressMessages(base::tempdir())
```

## first method: step by step, using CytoPipeline methods

In this sub-section, we build a `CytoPipeline` object and successively add 
`CytoProcessingStep` objects to the two different processing queues. We do this
for the PeacoQC pipeline.

```{r CytoPipelineSteps}

# main parameters : sample files and output files

experimentName <- "OMIP021_PeacoQC"
sampleFiles <- file.path(rawDataDir, list.files(rawDataDir,
                                                pattern = "Donor"))

pipL_PeacoQC <- CytoPipeline(experimentName = experimentName,
                             sampleFiles = sampleFiles)

### SCALE TRANSFORMATION STEPS ###

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "scale transform",
        CytoProcessingStep(
            name = "flowframe_read",
            FUN = "readSampleFiles",
            ARGS = list(
                whichSamples = "all",
                truncate_max_range = FALSE,
                min.limit = NULL
            )
        )
    )

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "scale transform",
        CytoProcessingStep(
            name = "remove_margins",
            FUN = "removeMarginsPeacoQC",
            ARGS = list()
        )
    )

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "scale transform",
        CytoProcessingStep(
            name = "compensate",
            FUN = "compensateFromMatrix",
            ARGS = list(matrixSource = "fcs")
        )
    )

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "scale transform",
        CytoProcessingStep(
            name = "flowframe_aggregate",
            FUN = "aggregateAndSample",
            ARGS = list(
                nTotalEvents = 10000,
                seed = 0
            )
        )
    )

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "scale transform",
        CytoProcessingStep(
            name = "scale_transform_estimate",
            FUN = "estimateScaleTransforms",
            ARGS = list(
                fluoMethod = "estimateLogicle",
                scatterMethod = "linear",
                scatterRefMarker = "BV785 - CD3"
            )
        )
    )

### FLOW FRAME PRE-PROCESSING STEPS ###

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "pre-processing",
        CytoProcessingStep(
            name = "flowframe_read",
            FUN = "readSampleFiles",
            ARGS = list(
                truncate_max_range = FALSE,
                min.limit = NULL
            )
        )
    )


pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "pre-processing",
        CytoProcessingStep(
            name = "remove_margins",
            FUN = "removeMarginsPeacoQC",
            ARGS = list()
        )
    )

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "pre-processing",
        CytoProcessingStep(
            name = "compensate",
            FUN = "compensateFromMatrix",
            ARGS = list(matrixSource = "fcs")
        )
    )

pipL_PeacoQC <-
    addProcessingStep(
        pipL_PeacoQC,
        whichQueue = "pre-processing",
        CytoProcessingStep(
            name = "perform_QC",
            FUN = "qualityControlPeacoQC",
            ARGS = list(
                preTransform = TRUE,
                min_cells = 150, # default
                max_bins = 500, # default
                step = 500, # default,
                MAD = 6, # default
                IT_limit = 0.55, # default
                force_IT = 150, # default
                peak_removal = 0.3333, # default
                min_nr_bins_peakdetection = 10 # default
            )
        )
    )

pipL_PeacoQC <-
    addProcessingStep(
        pipL_PeacoQC,
        whichQueue = "pre-processing",
        CytoProcessingStep(
            name = "remove_doublets",
            FUN = "removeDoubletsCytoPipeline",
            ARGS = list(
                areaChannels = c("FSC-A", "SSC-A"),
                heightChannels = c("FSC-H", "SSC-H"),
                nmads = c(3, 5))
            )
    )
                    

                
pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "pre-processing",
        CytoProcessingStep(
            name = "remove_debris",
            FUN = "removeDebrisManualGate",
            ARGS = list(
                FSCChannel = "FSC-A",
                SSCChannel = "SSC-A",
                gateData =  c(73615, 110174, 213000, 201000, 126000,
                              47679, 260500, 260500, 113000, 35000)
            )
        )
    )

pipL_PeacoQC <-
    addProcessingStep(pipL_PeacoQC,
        whichQueue = "pre-processing",
        CytoProcessingStep(
            name = "remove_dead_cells",
            FUN = "removeDeadCellsManualGate",
            ARGS = list(
                FSCChannel = "FSC-A",
                LDMarker = "L/D Aqua - Viability",
                gateData = c(0, 0, 250000, 250000,
                             0, 650, 650, 0)
            )
        )
    )


```


## second method: in one go, using JSON file input 

In this sub-section, we build the flowAI pipeline, this time using a JSON
file as an input. Note that the `experimentName` and `sampleFiles` are here 
specified in the JSON file itself. This is not necessary, as 
one could well specify the processing steps only in the JSON file, and 
pass the `experimentName` and `sampleFiles` directly 
in the `CytoPipeline` constructor.

```{r CytoPipelineJson}

jsonDir <- rawDataDir

# creation on CytoPipeline object,
# using json file as input
pipL_flowAI <-
  CytoPipeline(file.path(jsonDir, "OMIP021_flowAI_pipeline.json"),
               experimentName = "OMIP021_flowAI",
               sampleFiles = sampleFiles)


```

# Executing pipelines

## Executing PeacoQC pipeline

Note: executing the next statement might generate some warnings.  
These are generated by the `PeacoQC method`, are highly dependent 
on the shape of the data investigated, and can safely be ignored here.

```{r executePeacoQC}
# execute PeacoQC pipeline
execute(pipL_PeacoQC, path = workDir)
```


## Executing flowAI pipeline

Note: again this might generate some warnings, due to flowAI.  
These are highly dependent on the shape of the data investigated,
and can safely be ignored here.

```{r executeFlowAI }
# execute flowAI pipeline
execute(pipL_flowAI, path = workDir)
```




# Inspecting results and visualization

## Plotting processing queues as workflow graphs

```{r plotWorkFlows1, fig.cap = "PeacoQC pipeline - scale transformList processing queue"}

# plot work flow graph - PeacoQC - scale transformList
plotCytoPipelineProcessingQueue(
  pipL_PeacoQC,
  whichQueue = "scale transform",
  path = workDir)

```

```{r plotWorkFlows2, fig.cap = "PeacoQC pipeline - file pre-processing queue"}

# plot work flow graph - PeacoQC - pre-processing
plotCytoPipelineProcessingQueue(
  pipL_PeacoQC,
  whichQueue = "pre-processing",
  sampleFile = 1,
  path = workDir)
```

```{r plotWorkFlows3, fig.cap = "flowAI pipeline - scale transformList processing queue"}

# plot work flow graph - flowAI - scale transformList
plotCytoPipelineProcessingQueue(
  pipL_flowAI,
  whichQueue = "scale transform",
  path = workDir)
```

```{r plotWorkFlows4, fig.cap = "flowAI pipeline - file pre-processing queue"}

# plot work flow graph - flowAI - pre-processing

plotCytoPipelineProcessingQueue(
  pipL_flowAI,
  whichQueue = "pre-processing",
  sampleFile = 1,
  path = workDir)
```


## Obtaining information about pipeline generated objects

```{r gettingObjectInfos}
getCytoPipelineObjectInfos(pipL_PeacoQC, 
                           path = workDir,
                           whichQueue = "scale transform")
                                  
getCytoPipelineObjectInfos(pipL_PeacoQC, 
                           path = workDir,
                           whichQueue = "pre-processing",
                           sampleFile = sampleFiles(pipL_PeacoQC)[1])
```

## Retrieving flow frames at different steps and plotting them


```{r gettingFlowFrames}
# example of retrieving a flow frame
# at a given step
ff <- getCytoPipelineFlowFrame(
  pipL_PeacoQC,
  whichQueue = "pre-processing",
  sampleFile = 1,
  objectName = "remove_doublets_obj",
  path = workDir)

#
ff2 <- getCytoPipelineFlowFrame(
  pipL_PeacoQC,
  whichQueue = "pre-processing",
  sampleFile = 1,
  objectName = "remove_debris_obj",
  path = workDir)

```


```{r ggplotEvents1, fig.cap = "1-dimensional distribution plot (forward scatter channel)"}
ggplotEvents(ff, xChannel = "FSC-A")
```

```{r ggplotEvents2, fig.cap = "2-dimensional distribution plot (forward scatter vs. side scatter channels)"}
ggplotEvents(ff, xChannel = "FSC-A", yChannel = "SSC-A")
```

```{r ggplotEvents3, fig.cap = "2-dimensional difference plot between remove_doublets and remove_debris steps"}
ggplotFilterEvents(ff, ff2, xChannel = "FSC-A", yChannel = "SSC-A")
```


## Example of retrieving another type of object

We now provide an example on how to retrieve an object from the cache, that
is not specifically a `flowCore::flowFrame`.  

Here we retrieve a `flowCore::flowSet` object, which represents a set of  
`flowCore::flowFrame`objects, that was obtained after the compensation step 
of the scale transformation processing queue, prior to aggregating the 
two samples.

```{r cacheObjectRetrieve}
obj <- getCytoPipelineObjectFromCache(pipL_PeacoQC,
                                      path = workDir,
                                      whichQueue = "scale transform",
                                      objectName = "compensate_obj")
show(obj)
```


## Interactive visualization

Using the `CytoPipelineGUI` package, it is possible to interactively inspect
results at the different steps of the pipeline, either in the form of 
`flowCore::flowFrame` objects, or `flowCore::transformList`. 
To do this, install the `CytoPipelineGUI` package, and uncomment 
the following code: 

```{r launchShinyApp}
#devtools::install_github("https://github.com/UCLouvain-CBIO/CytoPipelineGUI")
#CytoPipelineGUI::CytoPipelineCheckApp(dir = workDir)
```

# Adding function wrappers - note on the CytoPipelineUtils package

As was described in the previous sections, `CytoPipeline` requires the user to 
provide wrappers to pre-processing functions, as `FUN` parameter of 
`CytoProcessingSteps`. These can be coded by the user themself, or 
come from a built-in function provided in `CytoPipeline` itself.

However, in order to avoid having too many external dependencies for
`CytoPipeline`, another package `CytoPipelineUtils`, is also
[available](https://github.com/UCLouvain-CBIO/CytoPipelineUtils)
`CytoPipelineUtils` is meant to be used in conjunction with `CytoPipeline` 
package. It is a helper package, which is aimed at hosting wrapper 
implementations of various functions of various packages.

`CytoPipelineUtils` is open to contributions. If you want to implement your 
own wrapper of your favourite pre-processing function and use it in a 
`CytoPipeline` object, this is the place to do it! 

# Session information {-}

```{r sessioninfo, echo=FALSE}
sessionInfo()
```


# References {-}