Hadoop with Python: PySpark

Introduction

This is the first part of a series of posts about how to leverage Hadoop (the Distributed Computing Framework) using Python.

The goal of this series of posts is to focus on specific tools and recipes to solve recurrent challenges many Data professionals face, for example:

  • Moving HDFS (Hadoop Distributed File System) files using Python.

  • Loading Data from HDFS into a Data Structure like a Spark or pandas DataFrame in order to make calculations.

  • Write the results of an analysis back to HDFS.

First tool in this series is Spark. A framework which defines itself as a unified analytics engine for large-scale data processing.

 
Apache Spark

Apache Spark

 

PySpark and findspark installation

I encourage you to use conda virtual environments. If you don’t know how to set up conda, please read this post.

First of all, install findspark, a library that will help you to integrate Spark into your Python workflow, and also pyspark in case you are working in a local computer and not in a proper Hadoop cluster.

If you are following this tutorial in a Hadoop cluster, can skip PySpark install.

conda install -c conda-forge findspark -y # optional, for local setup conda install -c conda-forge pyspark openjdk -y

Spark setup with findspark

Once you install findspark, it is time to setup Spark for usage in your Python code.

Code for both local and cluster mode is provided here, uncomment the line you need and adapt paths depending on your particular infrastructure and library versions (cloudera Spark path should be pretty similar to the one provided here):

import findspark # Local Spark findspark.init(‘/home/cloudera/miniconda3/envs/<your_environment_name>/lib/python3.7/site-packages/pyspark/’) # Cloudera Cluster Spark findspark.init(spark_home=’/opt/cloudera/parcels/SPARK2–2.3.0.cloudera4–1.cdh5.13.3.p0.611179/lib/spark2/’)

This tutorial have been written using Cloudera Quickstart VM (a CentOS linux distribution with an username called cloudera), remember to adapt paths to your infrastructure!

Creating a Spark application

Once Spark is initialized, we have to create a Spark application, execute the following code, and make sure you specify the master you need, like 'yarn' in the case of a proper Hadoop cluster, or 'local[*]' in the case of a fully local setup:

from pyspark.sql import SparkSession spark = SparkSession.builder.appName(‘example_app’).master(‘yarn’).getOrCreate()

PySpark Recipes and Use Cases

Once we have our working Spark, let’s start interacting with Hadoop taking advantage of it with some common use cases.

Listing Hive databases

Let’s get existing databases. I assume you are familiar with Spark DataFrame API and its methods:

spark.sql("show databases").show()

You should get something like this:

+------------+ |databaseName| +------------+ | db1| | default| | fhadoop| +------------+

Transform pandas DataFrame into a Spark DataFrame

First integration is about how to move data from pandas library, which is Python standard library to perform in-memory data manipulation, to Spark.

First, let’s load a pandas DataFrame. This one is about Air Quality in Madrid (just to satisfy your curiosity, but not important with regards to moving data from one place to another one). You can download it here. Make sure you install the library pytables to read hdf5 formatted data.

import pandas as pd air_quality_df = pd.read_hdf(‘data/air_quality/air-quality-madrid/madrid.h5’, key=’28079008') air_quality_df.head()

This data is a time series for many well known pollutants like NOX, Ozone, and more:

 
1_QerdAmxGouvRd-H4eaOD0g.png
 

Let’s make some changes to this DataFrame, like resetting datetime index to avoid losing information when loading into Spark. Datetime column will also be transformed to string as Spark has some issues working with dates (related to system locale, timezones, and so on) unless further configuration depending on your locale.

air_quality_df.reset_index(inplace=True) air_quality_df[‘date’] = air_quality_df[‘date’].dt.strftime(‘%Y-%m-%d %H:%M:%S’)

We can simply load from pandas to Spark with createDataFrame:

air_quality_sdf = spark.createDataFrame(air_quality_df)

Once DataFrame is loaded into Spark (as air_quality_sdf here), can be manipulated easily using PySpark DataFrame API:

air_quality_sdf.select('date', 'NOx').show(5)

Output should look like this:

+— — — — — — — — — -+ — — — — — — — — — + | date| NOx| + — — — — — — — — — + — — — — — — — — — + |2001–07–01 01:00:00| 1017.0| |2001–07–01 02:00:00| 409.20001220703125| |2001–07–01 03:00:00| 143.39999389648438| |2001–07–01 04:00:00| 149.3000030517578| |2001–07–01 05:00:00| 124.80000305175781| + — — — — — — — — — + — — — — — — — — — + only showing top 5 rows

Create Hive table from Spark DataFrame

To persist a Spark DataFrame into HDFS, where it can be queried using default Hadoop SQL engine (Hive), one straightforward strategy (not the only one) is to create a temporal view from that DataFrame:

air_quality_sdf.createOrReplaceTempView("air_quality_sdf")

Once the temporal view is created, it can be used from Spark SQL engine to create a real table using create table as select. Before creating this table, I will create a new database called analytics to store it:

sql_create_database = """ create database if not exists analytics location '/user/cloudera/analytics/' """ result_create_db = spark.sql(sql_create_database)

Then, we can create a new table there:

sql_create_table = """ create table if not exists analytics.pandas_spark_hive using parquet as select to_timestamp(date) as date_parsed, * from air_quality_sdf """ result_create_table = spark.sql(sql_create_table)

Reading data from Hive table using PySpark

Once we have created our Hive table, can check results using Spark SQL engine to load results back, for example to select ozone pollutant concentration over time:

spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)

Output :

+ — — — — — — — — — + — — — — — — — — — + | date_parsed | O_3| + — — — — — — — — — + — — — — — — — — — + |2001–07–01 01:00:00| 9.010000228881836| |2001–07–01 02:00:00| 23.81999969482422| |2001–07–01 03:00:00| 31.059999465942383| |2001–07–01 04:00:00| 23.780000686645508| |2001–07–01 05:00:00| 29.530000686645508| + — — — — — — — — — + — — — — — — — — — + only showing top 5 rows

Hope you liked this post. In the next weeks we will release a series of posts with alternative tools you can use to master Hadoop with Python.

Anterior
Anterior

The online courses you must take to be a better Data Scientist

Siguiente
Siguiente

The Definitive Data Scientist Environment Setup