Creating and publishing blocks and pipelines

This tutorial guides you through the steps required to publish custom built blocks and pipelines to RZT aiOS. A published block can be later imported in Jupyter notebook and it will also be shown in IDE pipeline builder. Similarly, a pipeline created in Jupyter notebook using published blocks can be saved so that it can be accessed later from IDE and Jupyter notebook.

Every block is published with an associated bundle name which is analogous to a python module name. In order to publish a block, the block code should be placed in certain hierarchy of directories. A block can be published with two different scopes

  • project scope: A block published with project scope will be available ony in the project from which it is published. All python code for a block with project scope should be placed inside the following directory. Replace <bundle_name> with the actual bundle name of the block.

    • __blocks/project/<bundle_name>
  • org scope: A block can be published with org scope so that it is available in all projects for that tenant. Custom block with org scope should follow the following directory structure

    • __blocks/org/<bundle_name>

The examples described below explains how to publish blocks with project scope and how to save a pipeline built using published blocks. To change the project scope to org, just change the the directory structure (change the directory from project to org)

1.1 Create the block code

Create a python file __blocks/project/Batch_IO/batch_io.py with the following code

import razor.flow as rf
import typing as t
from razor.api import project_space_path

import pandas as pd
@rf.block
class CsvReaderBatch:
    __publish__ = True
    __label__ = "CsvReaderBatch"
    __description__ = "Reads a csv file from project space and output the data as pandas DataFrame object"
    __tags__ = []
    __category__ = 'Batch_IO'

    filename: str
    output:rf.Output[pd.DataFrame]
    def run(self):
        file_path = project_space_path(self.filename)
        df = pd.read_csv(file_path)
        self.output.put(df)
        
@rf.block
class DfFilterNan:
    __publish__ = True
    __label__ = "DfFilterNan"
    __description__ = "Remove the rows with nan from a data frame"
    __tags__ = []
    __category__ = 'Batch_IO'

    
    df:pd.DataFrame
    output: rf.Output[pd.DataFrame]
    def run(self):
        self.df.dropna(axis=0, inplace=True)
        self.output.put(self.df)
        
@rf.block
class CsvWriterBatch:
    __publish__ = True
    __label__ = "CsvWriterBatch"
    __description__ = "Save a pandas dataframe as csv file in project space"
    __tags__ = []
    __category__ = 'Batch_IO'

    filename: str
    df:pd.DataFrame
    def run(self):
        self.df.to_csv(project_space_path(self.filename))

1.2 Import the blocks

Create a python file __blocks/project/Batch_IO/__init__.py with the following code

from .batch_io import CsvReaderBatch, DfFilterNan, CsvWriterBatch

from razor import block_setup
__metadata__ = block_setup(version="0.0.1")

1.3 Publish the blocks

from razor import BlockScope
razor.api.blocks.publish(scope=BlockScope.PROJECT,bundle='Batch_IO',overwrite =True)
INFO:razor.api.impl.block_manager_impl:Packaging the bundle...
INFO:razor.api.impl.block_manager_impl:Publishing block bundle...
INFO:razor.api.impl.block_manager_impl:Block bundle published.
INFO:razor.api.impl.block_manager_impl:Make sure to restart the Jupyter kernel and then you can use the blocks as follows:
INFO:razor.api.impl.block_manager_impl:from razor.project.blocks.Batch_IO import DfFilterNan
INFO:razor.api.impl.block_manager_impl:from razor.project.blocks.Batch_IO import CsvWriterBatch
INFO:razor.api.impl.block_manager_impl:from razor.project.blocks.Batch_IO import CsvReaderBatch

1.4 Create a pipeline using published blocks

Use the custom blocks, created and published in above sections, to create a pipeline that reads a csv file titanic/train.csv uploaded in the project space from IDE. See article uploading a file in project space to see how to upload a data file in your project space

from razor.project.blocks.Batch_IO import CsvWriterBatch
from razor.project.blocks.Batch_IO import DfFilterNan
from razor.project.blocks.Batch_IO import CsvReaderBatch

csv_reader = CsvReaderBatch(filename = "titanic/train.csv")
nan_filter = DfFilterNan(df = csv_reader.output)
csv_writer = CsvWriterBatch(filename="titanic/train_nan_removed.csv", df=nan_filter.output)
pipeline = rf.Pipeline(name="Filter Nan", targets=[csv_writer])
pipeline.show()

svg

1.5 Save the pipeline

razor.api.pipelines.save(pipeline)
INFO:razor.api.impl.pipeline_manager_impl:Registering pipeline with name: `Filter Nan`
INFO:razor.api.impl.pipeline_manager_impl:
Saving pipeline...
INFO:razor.api.impl.pipeline_manager_impl:
Pipeline is valid.

INFO:razor.api.impl.pipeline_manager_impl:Pipeline saved!

1.6 View saved pipelines

The api razor.api.pipelines() can be used to list all the saved pipelines.

razor.api.pipelines()

The saved pipeline will also start appearing in IDE

pipeline published 1

pipeline published 2