RZT aiOS provides following two constructs for running spark code
razor.SparkBlock
A spark block can be created by extending this class. The base class SparkBlock
initializes spark session and keeps it in attribute self.spark
razor.SparkExecutor
SparkExecutor
is a specialization of ContainerExecutor
. The execution environment for spark block should be specified as SparkExecutor
This guide illustrates how to build a spark block and execute it as part of a pipeline.
titanic/train.csv
. Titanic dataset is a binary classification dataset and can be downloaded from hereDefine two custom spark blocks
titanic/train.csv
from project space and output a spark dataframeimport razor.flow as rf
from pyspark.sql.dataframe import DataFrame
from razor import SparkBlock
from razor.api import project_space_path
@rf.block
class ReadCsvProjectSpace(SparkBlock):
# Atomic input - csv filename relative to project space
filename:str
# Atomic output of type spark DataFrame.
data: rf.Output[DataFrame]
def run(self):
df = self.spark.read.csv(project_space_path(self.filename))
df.printSchema()
self.data.put(df)
from razor.spark import SparkBlock
@rf.block
class DropNan(SparkBlock):
input_df: DataFrame
out: rf.Output[DataFrame]
def run(self):
_df = self.input_df.na.drop()
self.out.put(_df)
from razor import SparkExecutor
read_csv = ReadCsvProjectSpace(filename = "titanic/train.csv")
read_csv.executor = SparkExecutor(master_core=2,master_memory=2048,worker=1, worker_core=3,worker_memory=3072)
nan_dropped = DropNan(input_df=read_csv.data)
nan_dropped.executor = SparkExecutor(master_core=2,master_memory=2048,worker=1, worker_core=3,worker_memory=3072)
pipeline = rf.Pipeline(targets=[nan_dropped])
pipeline.show()
Run the pipeline
pipeline.execute()