# Introducing Pipelines

## Pipeline Basics

### Problem

You want to chain blocks together so that the output of one block is fed into another.

### Solution

Up until now, each block was run individually using the `.execute` method of the block. While it is possible to manually collect the output from one block and provide it as an input parameter to the next, to reap the full benefits of all the features of inter-connecting blocks, you'll need to create a pipeline.

A Pipeline is essentially a DAG (Directed Acyclic Graph) of blocks where data flows only in one direction. An example of a pipeline is as shown bellow.

The first step would be define the blocks for the pipeline as bellow

``````import random
@rf.block
class GenerateNumbers:
number_list: rf.SeriesOutput[int]

def run (self):
for i in range(1, 100):
n = random.randint(1, 1000)
self.number_list.put(n)

@rf.block
class MultiplyByFactor:
number_list: rf.SeriesInput[int]
factor: int
factor_list: rf.SeriesOutput[int]

def run (self):
for number in self.number_list:
res = number * self.factor
self.factor_list.put(res)

@rf.block
class SumListElements:
factor_list: rf.SeriesInput[int]

def run(self):
factor_sum = 0
for factor in self.factor_list:
factor_sum += factor

print(factor_sum)
``````

To create a pipeline, the output of one block is chained as an input to the another block. In the following example of the pipeline, the output of GenerateNumbers is passed as an input to the MultiplyByFactor and the output of MultiplyByFactor is passed as in input to the SumListElements

A specific variable that needs to be passed to the next block can be accessed from the previous blocks object. As shown in the in the following pipeline, the number list which is passed to the MultiplyByFactor block is accessed as generatenums.numberlist

``````generate_numbers = GenerateNumbers()
multiply_by_factor = MultiplyByFactor(number_list=generate_numbers.number_list, factor=8)
sum_list_elements = SumListElements(factor_list=multiply_by_factor.factor_list)
``````

The pipeline can now be constructed using the pipeline API, described as follows

``````pipeline = rf.Pipeline(targets=[sum_list_elements])
``````

The constructed pipeline can be visualised, using the show API, which will provide a graphical representation of the pipeline

``````pipeline.show()
``````

The pipeline can now be run using the following API

``````pipeline.execute()
``````

On exceution of the pipeline, a widget appears which monitors the execution of the pipeline, and provides information to the user with the details of the running pipeline

### Execution Environment

The pipelines can be executed in two execution environments.

1. Jupyter - in this mode, the pipeline is executed within the confines of the Jupyter notebook
2. Engine - An Engine is an execution environment provided within the platform which allows the users to run pipelines on a large distributed infrastucture. The following example shows how the pipeline can be executed on the engine
``````razor.api.engines('name').execute(pipeline)
``````

The API, needs to be provided with the name of the engine on which the pipeline is going to run. Also, the pipeline runnign on the engine, can be monitored using a similar widget which provides details of the pipeline run

##### Problem

You have found a block that you want to use, however the output of the block is not in the format that you want.

##### Solution

Razor blocks have an `adaptor` attribute for a block, to which a function can be passed. This function could be a custom function that would convert the output of the block into a desired format as required. For example, you could have a block that generates random integers. However, the desired output could be a float instead of int. In that case an adaptor, can be used to conver the integers into float.

The following example illustrates the above scenario

``````def convert_int_to_float(number_list):
return float(number_list)

generate_numbers = GenerateNumbers()
generate_numbers.execute()
``````

### Executor

#### Problem

You want to define how the block executes.

#### Solution

The executors define the way a block executes. The platofrm supports the following executors

1. Container - The entire block runs on a container
2. Sub Process Executor - The block runs as a sub process within the main process of a block
3. Process Executor - The block runs as a single process

The executor for a block can be assigned as follows

``````import razor.flow as rf

@rf.block(executor = rf.ContainerExecutor(cores=1, memory=1024))
class SplitString:
# Atomic inputs taking default values as str. These inputs are by default initialised to rf.Input class
texts: rf.SeriesInput[str]
delimiter: str

# Atomic input of type list. Provides the results as a list
data: rf.SeriesOutput[list]

def run(self):
for text in self.texts:
result = text.split(self.delimiter)
self.data.put(result)
``````

This defines the executor as a container executor with the specified amount of cores and memory

The other executors are defined as follows

• Sub Process Executor - `rf.SubProcessExecutor`
• Process Executor - `rf.ProcessExecutor`

RZT aiOS allows user to specify following specific types of container executors

• Spark executor `rf.SparkExecutor` can be used for running spark code
• Horovod executor `rf.HorovodExecutor` is used for running DL training code on distributed GPU

### Transports

##### Problem

You as the user, would like to transmit data between blocks in a particular fashion

##### Solution

The platform supports a construct called the Transport. The transport defines the mechanism through which the data is transmitted between the blocks in a pipeline. The platform supports multiple such transport mechanisms. Few of which are listed as follows

1. Kafka Transport - Uses Kafka as a transport mechanism to transport data between blocks. Preferably used for use cases where series data is present and data redundancy is important
2. File Transport - The output of every block is written to a file, and the subsequent blocks read the file to get the input. This transport mechanism is typically used for atomic batch data
3. Zero MQ (Socket Transport) - The data between blocks is sent using a socket. Used for fast serial data transfer between blocks where redundancy is not important

The transports at a block can be defined as follows

``````import razor.flow as rf

@rf.block(executor = rf.ContainerExecutor(cores=1, memory=1024))
class SplitString:
# Atomic inputs taking default values as str. These inputs are by default initialised to rf.Input class
texts: rf.SeriesInput[str]
delimiter: str

# Atomic input of type list. Provides the results as a list
data: rf.SeriesOutput[list] = rf.SeriesOutput(transport=rf.KafkaTransport)

def run(self):
for text in self.texts:
result = text.split(self.delimiter)
self.data.put(result)
``````