Atomic versus series input/ouput of block

Input to a block can be of two types

  • Atomic: An atomic input is a single object such as an integer, string or an instance of any other class.
  • Series: A series input is a list of values that is streamed to the input of the block. RZT SDK treats a series input as a queue. The block can consume and process input values as and when they appear on the input queue, where as in the case of an atomic input the entire object is available at once when the block code is invoked. To mark an input variable as series input, define it as type rf.SeriesInput[element_type] where element_type is the data type of each element of the series. One can retrieve the values from the queue either

    1. by treating the input variable as an iterator or
    2. using .get() method to get the next value in the queue.

When the end of queue is reached, get method returns an End of Stream token

In a similar way one can define atomic or series ouput as well. An output variable defined as of type rf.SeriesOutput[element_type] is treated as an output queue and can be connected to series input of a succeeding block. Although in most cases, a series input is fed by the series output of another block, one can provide any iterable, such as list, as a series input

Example atomic input and atomic output

The following code takes an atomic string input filename and gives atomic boolean file_exists as output. The output is True when the file exists in the project space

import razor.flow as rf
import os
from razor.api import project_space_path
@rf.block
class AtomicInputDemo():
    filename:str
    file_exists:rf.Output[bool]
    def run(self):
        self.logger.info(self.filename)
        self.file_exists.put(os.path.isfile(project_space_path(self.filename)))
        
rf.Pipeline(targets=[AtomicInputDemo(filename="titanic/test.csv")]).execute()

Example for atomic input and series output

The following code takes a csv filename as input, reads the file in pandas and outputs one row at a time as a pandas Series object

from razor.api import project_space_path
import pandas as pd
@rf.block
class CsvReader():
    filename:str
    rows:rf.SeriesOutput[pd.Series]
    def run(self):
        self.logger.info(self.filename)
        df = pd.read_csv(project_space_path(self.filename))
        for row in df.iterrows():
            self.rows.put(row[1])
rf.Pipeline(targets=[CsvReader(filename="titanic/test.csv")]).execute()

Feeding series input from series output of another block

The example below shows how to connect the series output of one block to series input of another block. In this example, first block generates random integers and streams the generated integers as the output of the block. The succeeding block consumes the streaming integer data and computes some statistics like number of times each integers has appeared so far.

import razor.flow as rf
import random
import time
@rf.block
class RandomNumberGenerator():
    size:int
    out:rf.SeriesOutput[int]
    def run(self):
        for i in range(self.size):
            self.out.put(random.randint(0,5))
            time.sleep(2)
            
@rf.block
class CountNumbers():
    numbers : rf.SeriesInput[int]
    counts = [0 for i in range(6)]
    def run(self):
        for number in self.numbers:
            self.counts[number] = self.counts[number] + 1
            self.logger.info(f"Digit recieved {number}. Counts:{self.counts}")
            

Create a pipeline and display

rnd_num_generator = RandomNumberGenerator(size=20)
count_numbers = CountNumbers(numbers=rnd_num_generator.out)
pipeline = rf.Pipeline(targets=[count_numbers])
pipeline.show()

svg

Run the pipeline and click on the logs tab of the pipe line run widget

pipeline.execute()

series output

Note that even though there is no delay in the code of consuming block, there is a delay in printing the values in consuming block. This is because the for loop in the consumer gets into a blocking wait state until next value appears on the series input queue