Input to a block can be of two types
Series: A series input is a list of values that is streamed to the input of the block. RZT SDK treats a series input as a queue. The block can consume and process input values as and when they appear on the input queue, where as in the case of an atomic input the entire object is available at once when the block code is invoked. To mark an input variable as series input, define it as type rf.SeriesInput[element_type]
where element_type
is the data type of each element of the series. One can retrieve the values from the queue either
.get()
method to get the next value in the queue.When the end of queue is reached, get
method returns an End of Stream
token
In a similar way one can define atomic or series ouput as well. An output variable defined as of type rf.SeriesOutput[element_type]
is treated as an output queue and can be connected to series input of a succeeding block. Although in most cases, a series input is fed by the series output of another block, one can provide any iterable, such as list, as a series input
The following code takes an atomic string input filename
and gives atomic boolean file_exists
as output. The output is True
when the file exists in the project space
import razor.flow as rf
import os
from razor.api import project_space_path
@rf.block
class AtomicInputDemo():
filename:str
file_exists:rf.Output[bool]
def run(self):
self.logger.info(self.filename)
self.file_exists.put(os.path.isfile(project_space_path(self.filename)))
rf.Pipeline(targets=[AtomicInputDemo(filename="titanic/test.csv")]).execute()
The following code takes a csv filename as input, reads the file in pandas and outputs one row at a time as a pandas Series
object
from razor.api import project_space_path
import pandas as pd
@rf.block
class CsvReader():
filename:str
rows:rf.SeriesOutput[pd.Series]
def run(self):
self.logger.info(self.filename)
df = pd.read_csv(project_space_path(self.filename))
for row in df.iterrows():
self.rows.put(row[1])
rf.Pipeline(targets=[CsvReader(filename="titanic/test.csv")]).execute()
The example below shows how to connect the series output of one block to series input of another block. In this example, first block generates random integers and streams the generated integers as the output of the block. The succeeding block consumes the streaming integer data and computes some statistics like number of times each integers has appeared so far.
import razor.flow as rf
import random
import time
@rf.block
class RandomNumberGenerator():
size:int
out:rf.SeriesOutput[int]
def run(self):
for i in range(self.size):
self.out.put(random.randint(0,5))
time.sleep(2)
@rf.block
class CountNumbers():
numbers : rf.SeriesInput[int]
counts = [0 for i in range(6)]
def run(self):
for number in self.numbers:
self.counts[number] = self.counts[number] + 1
self.logger.info(f"Digit recieved {number}. Counts:{self.counts}")
Create a pipeline and display
rnd_num_generator = RandomNumberGenerator(size=20)
count_numbers = CountNumbers(numbers=rnd_num_generator.out)
pipeline = rf.Pipeline(targets=[count_numbers])
pipeline.show()
Run the pipeline and click on the logs tab of the pipe line run widget
pipeline.execute()
Note that even though there is no delay in the code of consuming block, there is a delay in printing the values in consuming block. This is because the for loop in the consumer gets into a blocking wait state until next value appears on the series input queue