You want to chain blocks together so that the output of one block is fed into another.
Up until now, each block was run individually using the
.execute method of the block. While it is possible to manually collect the output from one block and provide it as an input parameter to the next, to reap the full benefits of all the features of inter-connecting blocks, you'll need to create a pipeline.
A Pipeline is essentially a DAG (Directed Acyclic Graph) of blocks where data flows only in one direction. An example of a pipeline is as shown bellow.
The first step would be define the blocks for the pipeline as bellow
import random @rf.block class GenerateNumbers: number_list: rf.SeriesOutput[int] def run (self): for i in range(1, 100): n = random.randint(1, 1000) self.number_list.put(n) @rf.block class MultiplyByFactor: number_list: rf.SeriesInput[int] factor: int factor_list: rf.SeriesOutput[int] def run (self): for number in self.number_list: res = number * self.factor self.factor_list.put(res) @rf.block class SumListElements: factor_list: rf.SeriesInput[int] def run(self): factor_sum = 0 for factor in self.factor_list: factor_sum += factor print(factor_sum)
To create a pipeline, the output of one block is chained as an input to the another block. In the following example of the pipeline, the output of GenerateNumbers is passed as an input to the MultiplyByFactor and the output of MultiplyByFactor is passed as in input to the SumListElements
A specific variable that needs to be passed to the next block can be accessed from the previous blocks object. As shown in the in the following pipeline, the number list which is passed to the MultiplyByFactor block is accessed as generatenums.numberlist
generate_numbers = GenerateNumbers() multiply_by_factor = MultiplyByFactor(number_list=generate_numbers.number_list, factor=8) sum_list_elements = SumListElements(factor_list=multiply_by_factor.factor_list)
The pipeline can now be constructed using the pipeline API, described as follows
pipeline = rf.Pipeline(targets=[sum_list_elements])
The constructed pipeline can be visualised, using the show API, which will provide a graphical representation of the pipeline
The pipeline can now be run using the following API
On exceution of the pipeline, a widget appears which monitors the execution of the pipeline, and provides information to the user with the details of the running pipeline
The pipelines can be executed in two execution environments.
The API, needs to be provided with the name of the engine on which the pipeline is going to run. Also, the pipeline runnign on the engine, can be monitored using a similar widget which provides details of the pipeline run
You have found a block that you want to use, however the output of the block is not in the format that you want.
Razor blocks have an
adaptor attribute for a block, to which a function can be passed. This function could be a custom function that would convert the output of the block into a desired format as required. For example, you could have a block that generates random integers. However, the desired output could be a float instead of int. In that case an adaptor, can be used to conver the integers into float.
The following example illustrates the above scenario
def convert_int_to_float(number_list): return float(number_list) generate_numbers = GenerateNumbers() generate_numbers.number_list = rf.SeriesOutput.set(adapter=convert_int_to_float) generate_numbers.execute()
You want to define how the block executes.
The executors define the way a block executes. The platofrm supports the following executors
The executor for a block can be assigned as follows
import razor.flow as rf @rf.block(executor = rf.ContainerExecutor(cores=1, memory=1024)) class SplitString: # Atomic inputs taking default values as str. These inputs are by default initialised to rf.Input class texts: rf.SeriesInput[str] delimiter: str # Atomic input of type list. Provides the results as a list data: rf.SeriesOutput[list] def run(self): for text in self.texts: result = text.split(self.delimiter) self.data.put(result)
This defines the executor as a container executor with the specified amount of cores and memory
The other executors are defined as follows
RZT aiOS allows user to specify following specific types of container executors
rf.SparkExecutorcan be used for running spark code
rf.HorovodExecutoris used for running DL training code on distributed GPU
You as the user, would like to transmit data between blocks in a particular fashion
The platform supports a construct called the Transport. The transport defines the mechanism through which the data is transmitted between the blocks in a pipeline. The platform supports multiple such transport mechanisms. Few of which are listed as follows
The transports at a block can be defined as follows
import razor.flow as rf @rf.block(executor = rf.ContainerExecutor(cores=1, memory=1024)) class SplitString: # Atomic inputs taking default values as str. These inputs are by default initialised to rf.Input class texts: rf.SeriesInput[str] delimiter: str # Atomic input of type list. Provides the results as a list data: rf.SeriesOutput[list] = rf.SeriesOutput(transport=rf.KafkaTransport) def run(self): for text in self.texts: result = text.split(self.delimiter) self.data.put(result)