Building and running a spark block

RZT aiOS provides following two constructs for running spark code

  • razor.SparkBlock A spark block can be created by extending this class. The base class SparkBlock initializes spark session and keeps it in attribute self.spark
  • razor.SparkExecutor SparkExecutor is a specialization of ContainerExecutor. The execution environment for spark block should be specified as SparkExecutor

This guide illustrates how to build a spark block and execute it as part of a pipeline.

Prerequisites

  • Upload the titanic train dataset to project space at location titanic/train.csv. Titanic dataset is a binary classification dataset and can be downloaded from here

Define the blocks

Define two custom spark blocks

  • A block to read the csv file titanic/train.csv from project space and output a spark dataframe
  • A block to drop rows containing null values from a spark Dataframe
import razor.flow as rf
from pyspark.sql.dataframe import DataFrame
from razor import SparkBlock
from razor.api import project_space_path

@rf.block
class ReadCsvProjectSpace(SparkBlock):
    # Atomic input - csv filename relative to project space
    filename:str
    # Atomic output of type spark DataFrame.
    data: rf.Output[DataFrame]
    
    def run(self):
        df = self.spark.read.csv(project_space_path(self.filename))
        df.printSchema()
        self.data.put(df)


from razor.spark import SparkBlock

@rf.block
class DropNan(SparkBlock):
    input_df: DataFrame
    out: rf.Output[DataFrame]

    def run(self):
        _df = self.input_df.na.drop()
        self.out.put(_df)

Build a pipeline and execute

from razor import SparkExecutor
read_csv = ReadCsvProjectSpace(filename = "titanic/train.csv")
read_csv.executor = SparkExecutor(master_core=2,master_memory=2048,worker=1, worker_core=3,worker_memory=3072)
nan_dropped = DropNan(input_df=read_csv.data)
nan_dropped.executor = SparkExecutor(master_core=2,master_memory=2048,worker=1, worker_core=3,worker_memory=3072)

pipeline = rf.Pipeline(targets=[nan_dropped])
pipeline.show()

svg

Run the pipeline

pipeline.execute()