This tutorial guides you through the steps required to publish custom built blocks and pipelines to RZT aiOS. A published block can be later imported in Jupyter notebook and it will also be shown in IDE pipeline builder. Similarly, a pipeline created in Jupyter notebook using published blocks can be saved so that it can be accessed later from IDE and Jupyter notebook.
Every block is published with an associated bundle name
which is analogous to a python module name. In order to publish a block, the block code should be placed in certain hierarchy of directories. A block can be published with two different scopes
project
scope: A block published with project scope will be available ony in the project from which it is published. All python code for a block with project scope should be placed inside the following directory. Replace <bundle_name>
with the actual bundle name of the block.
__blocks/project/<bundle_name>
org
scope: A block can be published with org
scope so that it is available in all projects for that tenant. Custom block with org
scope should follow the following directory structure
__blocks/org/<bundle_name>
The examples described below explains how to publish blocks with project scope and how to save a pipeline built using published blocks. To change the project scope to org, just change the the directory structure (change the directory from project
to org
)
Create a python file __blocks/project/Batch_IO/batch_io.py
with the following code
import razor.flow as rf
import typing as t
from razor.api import project_space_path
import pandas as pd
@rf.block
class CsvReaderBatch:
__publish__ = True
__label__ = "CsvReaderBatch"
__description__ = "Reads a csv file from project space and output the data as pandas DataFrame object"
__tags__ = []
__category__ = 'Batch_IO'
filename: str
output:rf.Output[pd.DataFrame]
def run(self):
file_path = project_space_path(self.filename)
df = pd.read_csv(file_path)
self.output.put(df)
@rf.block
class DfFilterNan:
__publish__ = True
__label__ = "DfFilterNan"
__description__ = "Remove the rows with nan from a data frame"
__tags__ = []
__category__ = 'Batch_IO'
df:pd.DataFrame
output: rf.Output[pd.DataFrame]
def run(self):
self.df.dropna(axis=0, inplace=True)
self.output.put(self.df)
@rf.block
class CsvWriterBatch:
__publish__ = True
__label__ = "CsvWriterBatch"
__description__ = "Save a pandas dataframe as csv file in project space"
__tags__ = []
__category__ = 'Batch_IO'
filename: str
df:pd.DataFrame
def run(self):
self.df.to_csv(project_space_path(self.filename))
Create a python file __blocks/project/Batch_IO/__init__.py
with the following code
from .batch_io import CsvReaderBatch, DfFilterNan, CsvWriterBatch
from razor import block_setup
__metadata__ = block_setup(version="0.0.1")
from razor import BlockScope
razor.api.blocks.publish(scope=BlockScope.PROJECT,bundle='Batch_IO',overwrite =True)
INFO:razor.api.impl.block_manager_impl:Packaging the bundle...
INFO:razor.api.impl.block_manager_impl:Publishing block bundle...
INFO:razor.api.impl.block_manager_impl:Block bundle published.
INFO:razor.api.impl.block_manager_impl:Make sure to restart the Jupyter kernel and then you can use the blocks as follows:
INFO:razor.api.impl.block_manager_impl:from razor.project.blocks.Batch_IO import DfFilterNan
INFO:razor.api.impl.block_manager_impl:from razor.project.blocks.Batch_IO import CsvWriterBatch
INFO:razor.api.impl.block_manager_impl:from razor.project.blocks.Batch_IO import CsvReaderBatch
Use the custom blocks, created and published in above sections, to create a pipeline that reads a csv file titanic/train.csv
uploaded
in the project space from IDE. See article uploading a file in project space to see how to upload a data file in your project space
from razor.project.blocks.Batch_IO import CsvWriterBatch
from razor.project.blocks.Batch_IO import DfFilterNan
from razor.project.blocks.Batch_IO import CsvReaderBatch
csv_reader = CsvReaderBatch(filename = "titanic/train.csv")
nan_filter = DfFilterNan(df = csv_reader.output)
csv_writer = CsvWriterBatch(filename="titanic/train_nan_removed.csv", df=nan_filter.output)
pipeline = rf.Pipeline(name="Filter Nan", targets=[csv_writer])
pipeline.show()
razor.api.pipelines.save(pipeline)
INFO:razor.api.impl.pipeline_manager_impl:Registering pipeline with name: `Filter Nan`
INFO:razor.api.impl.pipeline_manager_impl:
Saving pipeline...
INFO:razor.api.impl.pipeline_manager_impl:
Pipeline is valid.
INFO:razor.api.impl.pipeline_manager_impl:Pipeline saved!
The api razor.api.pipelines()
can be used to list all the saved pipelines.
razor.api.pipelines()
The saved pipeline will also start appearing in IDE