Introduction
This is the first part of a series of posts about how to leverage Hadoop (the Distributed Computing Framework) using Python.
The goal of this series of posts is to focus on specific tools and recipes to solve recurrent challenges many Data professionals face, for example:
- Moving HDFS (Hadoop Distributed File System) files using Python.
- Loading Data from HDFS into a Data Structure like a Spark or pandas DataFrame in order to make calculations.
- Write the results of an analysis back to HDFS.
First tool in this series is Spark. A framework which defines itself as a unified analytics engine for large-scale data processing.
PySpark and findspark installation
I encourage you to use conda virtual environments. If you don’t know how to set up conda, please read this post.
First of all, install findspark
, a library that will help you to integrate Spark into your Python workflow, and also pyspark
in case you are working in a local computer and not in a proper Hadoop cluster.
If you are following this tutorial in a Hadoop cluster, can skip PySpark install.
conda install -c conda-forge findspark -y
# optional, for local setup
conda install -c conda-forge pyspark openjdk -y
Spark setup with findspark
Once you install findspark, it is time to setup Spark for usage in your Python code.
Code for both local and cluster mode is provided here, uncomment the line you need and adapt paths depending on your particular infrastructure and library versions (cloudera Spark path should be pretty similar to the one provided here):
import findspark
# Local Spark
findspark.init(‘/home/cloudera/miniconda3/envs/<your_environment_name>/lib/python3.7/site-packages/pyspark/’)
# Cloudera Cluster Spark
findspark.init(spark_home=’/opt/cloudera/parcels/SPARK2–2.3.0.cloudera4–1.cdh5.13.3.p0.611179/lib/spark2/’)
This tutorial have been written using Cloudera Quickstart VM (a CentOS linux distribution with an username called cloudera
), remember to adapt paths to your infrastructure!
Creating a Spark application
Once Spark is initialized, we have to create a Spark application, execute the following code, and make sure you specify the master you need, like 'yarn'
in the case of a proper Hadoop cluster, or 'local[*]'
in the case of a fully local setup:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘example_app’).master(‘yarn’).getOrCreate()
PySpark Recipes and Use Cases
Once we have our working Spark, let’s start interacting with Hadoop taking advantage of it with some common use cases.
Listing Hive databases
Let’s get existing databases. I assume you are familiar with Spark DataFrame API and its methods:
spark.sql("show databases").show()
You should get something like this:
+------------+
|databaseName|
+------------+
| db1|
| default|
| fhadoop|
+------------+
Transform pandas DataFrame into a Spark DataFrame
First integration is about how to move data from pandas library, which is Python standard library to perform in-memory data manipulation, to Spark.
First, let’s load a pandas DataFrame. This one is about Air Quality in Madrid (just to satisfy your curiosity, but not important with regards to moving data from one place to another one). You can download it here. Make sure you install the library pytables
to read hdf5
formatted data.
import pandas as pd
air_quality_df = pd.read_hdf(‘data/air_quality/air-quality-madrid/madrid.h5’, key=’28079008')
air_quality_df.head()
This data is a time series for many well known pollutants like NOX, Ozone, and more:
Let’s make some changes to this DataFrame, like resetting datetime index to avoid losing information when loading into Spark. Datetime column will also be transformed to string as Spark has some issues working with dates (related to system locale, timezones, and so on) unless further configuration depending on your locale.
air_quality_df.reset_index(inplace=True)
air_quality_df[‘date’] = air_quality_df[‘date’].dt.strftime(‘%Y-%m-%d %H:%M:%S’)
We can simply load from pandas to Spark with createDataFrame
:
air_quality_sdf = spark.createDataFrame(air_quality_df)
Once DataFrame is loaded into Spark (as air_quality_sdf
here), can be manipulated easily using PySpark DataFrame API:
air_quality_sdf.select('date', 'NOx').show(5)
Output should look like this:
+— — — — — — — — — -+ — — — — — — — — — +
| date| NOx|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 1017.0|
|2001–07–01 02:00:00| 409.20001220703125|
|2001–07–01 03:00:00| 143.39999389648438|
|2001–07–01 04:00:00| 149.3000030517578|
|2001–07–01 05:00:00| 124.80000305175781|
+ — — — — — — — — — + — — — — — — — — — +
only showing top 5 rows
Create Hive table from Spark DataFrame
To persist a Spark DataFrame into HDFS, where it can be queried using default Hadoop SQL engine (Hive), one straightforward strategy (not the only one) is to create a temporal view from that DataFrame:
air_quality_sdf.createOrReplaceTempView("air_quality_sdf")
Once the temporal view is created, it can be used from Spark SQL engine to create a real table using create table as select
. Before creating this table, I will create a new database called analytics
to store it:
sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
result_create_db = spark.sql(sql_create_database)
Then, we can create a new table there:
sql_create_table = """
create table if not exists analytics.pandas_spark_hive
using parquet as select
to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""
result_create_table = spark.sql(sql_create_table)
Reading data from Hive table using PySpark
Once we have created our Hive table, can check results using Spark SQL engine to load results back, for example to select ozone pollutant concentration over time:
spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)
Output :
+ — — — — — — — — — + — — — — — — — — — +
only showing top 5 rows
| date_parsed | O_3|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 9.010000228881836|
|2001–07–01 02:00:00| 23.81999969482422|
|2001–07–01 03:00:00| 31.059999465942383|
|2001–07–01 04:00:00| 23.780000686645508|
|2001–07–01 05:00:00| 29.530000686645508|
+ — — — — — — — — — + — — — — — — — — — +
Hope you liked this post. In the next weeks we will release a series of posts with alternative tools you can use to master Hadoop with Python.