SJSU - Summer 2022 - Big Data Analytics

Session 2

Dr. Julien Pierret

Today's Agenda

  • Prerequisites
    • Installing Python libraries
    • Using a Python templates repo
    • Linters
    • pre-commit
  • Lab 🧪
    • Loading a dataset
    • Calculating Values
    • Adding Columns
    • Predictions
    • Accuracy
    • Pipelines

🧪 Lab - Setup time

  • Copy the template
  • clone your repo
    
                      git clone https://github.com/dafrenchyman/PythonProjectTemplate.git
                      
  • Follow the instructions given in the README.md
  • Add pyspark to the project
  • Run pip-compile on requirements.txt to add the new pyspark dependency
  • Install the packages

🧪 Lab - Loading the dataset 1

  • Fisher's Iris dataset
  • Load the dataset into a Spark DataFrame
    • Create a main
      
                            import sys
      
      
                            def main():
                                return
      
      
                            if __name__ == "__main__":
                                sys.exit(main())
                            

🧪 Lab - Loading the dataset 2

  • Load the dataset into a Spark DataFrame
    • ...
    • Start Spark
      
                            # Setup Spark
                            spark = SparkSession.builder.master("local[*]").getOrCreate()
                            
      • Automatically getting the import for SparkSession (ALT + ENTER)
        
                                    from pyspark.sql import SparkSession
                                    
    • Load the downloaded CSV
      
                            fisher_df = spark.read.csv(temp_csv_file, inferSchema="true")
                            
      • There's a mistake above!
        • Inspect the source file, does it have a header row?
      • pyspark.pandas.read_csv
      • How do we fix it?
        • Hint: Look at the header option

🧪 Lab - Loading the dataset 3

  • Load the dataset into a Spark DataFrame
    • ...
    • We fixed the header issue
      • Need the column names: Check out the description
      • Need to rename the columns - Hint: .toDF
    • View the DataFrame
      
                            fisher_df.show()
                            

🧪 Lab - Calculating Means

  • Before we can calculate anything: give our DataFrame a temp view
    
                      fisher_df.createOrReplaceTempView("fisher")
                      
  • Calculate the mean (average) of all 4 continous values
    • Hint: Use SQL!
      
                            fisher_avg_df = spark.sql("""
                              SELECT
                                  AVG(sepal_length) AS avg_sepal_length
                                  , ...
                                FROM fisher"""
                            )
                            
  • Calculate the mean (average) of all 4 continous values by class
    
                        SELECT
                            class
                            , AVG(sepal_length) AS avg_sepal_length
                            , ...
                          FROM fisher
                          GROUP BY class
                        

🧪 Lab - Adding a new column

  • Add a new column with a random value in it
  • Order the DataFrame by this column
    
                        SELECT
                            ...
                          FROM fisher
                          ORDER BY random_column_name
                        
  • Don't forget to see it!
    
                      fisher_random_df.show()
                      

🧪 Lab - Building a model 1

  • Build a predictive model to predict the class column
  • Steps
    • Need to merge all the "features" into one column: VectorAssembler
      
                            vector_assembler = VectorAssembler(
                              inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
                              outputCol="features",
                            )
                            fisher_df = vector_assembler.transform(fisher_df)
                            fisher_df.show()
                            
    • Need to properly encode the "class" (response) column to use it: StringIndexer
      
                              label_indexer = StringIndexer(inputCol="class", outputCol="class_idx")
                              label_indexer_model = label_indexer.fit(fisher_df)
                              fisher_df = label_indexer_model.transform(fisher_df)
                              fisher_df.show()
                              

🧪 Lab - Building a model 2

  • Steps
    • ...
    • Running the RandomForestClassifier
      
                          random_forest = RandomForestClassifier(
                              labelCol="class_idx",
                              featuresCol="features",
                          )
                          random_forest_model = random_forest.fit(fisher_df)
                          fisher_df_predicted = random_forest_model.transform(fisher_df)
                          fisher_df_predicted.show()
                          

🧪 Lab - Calculate how accurate the model is

  • Figure out if a prediction is correct
  • Easiest way = SQL
    
                        fisher_df_predicted.createOrReplaceTempView("predicted")
                        fisher_df_accuracy = spark.sql(
                            """
                            SELECT
                                    SUM(correct)/COUNT(*) AS accuracy
                                FROM
                                    (SELECT
                                            CASE WHEN prediction == class_idx THEN 1
                                            ELSE 0 END AS correct
                                        FROM predicted) AS TMP
                                  """
                        )
                        fisher_df_accuracy.show()
                        

🧪 Lab - Wrap into a Pipeline

  • Express all the code we've done into a Spark Pipeline
    
                      pipeline = Pipeline(
                        stages=[
                          vector_assembler,
                          label_indexer,
                          random_forest,
                        ]
                      )
                      model = pipeline.fit(fisher_df)
                      fisher_df_predicted = model.transform(fisher_df)
                      fisher_df_predicted.show()
                      

In Summary

  • You just
    • Learned about Linters
    • Learned about pre-commit
    • Loaded a dataset into Spark
    • Calculated some values from the dataset 🧮
    • Added a column to the dataset
    • Built a predictive model
    • Wrapped everything into a Pipeline


👏

Good Luck!!!!