DATA PIPELINES – FROM MYSQL TO SPARK WITH PYTHON

Part 1

We will present in these pages a use case explaining how to build Data Pipelines by fetching data from a MySQL Database in Python, retrieving external Data related to each user in a JSON file, export and import to/in CSV format, converting this Data into a Spark Dataframe for applying a function, merging the result and resend the processed data into the MySQL Database.

1. Fetching Data from the MySQL Database with Python

To query data in a MySQL database from Python, you need to do the following steps:

  • Connecting to the MySQL Database through a MySQLConnection object.
  • Creating an instance with MySQLCursor object.
  • Using the cursor to query by calling the method: execute().
  • Using fetchall() method to fetch data
  • Closing the cursor with the method: close()

Let’s imagine a (simplified) MySQL data as shown below:

user_info represents the field we need to process in our further Spark Process.

We will use MySQLdb which is an interface for connecting to a MySQL database with Python. It implements the Python Database API and is built on top of the MySQL C API. We will be using the post 3306 which is the default port for mysql. You might not need to specify it.

MySQL REMOTE ACCESS

By default, MySQL is installed only listening to localhost. To grant MySQL remote access, we need to first of all open the MySQL to listen to external connections. To do so, first send the following to MySQL :

After that, don’t forget to edit the line shown here below by editing the file :

/etc/mysql/my.cnf

and replace the line :

bind-address = 127.0.0.1

by :

bind-address = 0.0.0.0

and restart your server :

/etc/init.d/mysql restart

You should now be able to access (remotely) your MySQL Database.
Let’s see in details our Python script. First of all, instantiate your MySQLdb module :

For the example (and for simplicity), let’s try to fetch the fields user_id, user_email, user_info by limiting the query to the first 10 results sorted by user_id. Don’t forget to import Pandas Python library that we will be using for creating and handlng the DataFrame
For that, you can write your script as follows:

If everything goes well, your output should look like:

Now, let’s imagine a scenario where things are getting more complicated.

Beside our MySQL Database, we have another Data source in which we need to fetch Data to update our field user_info of out MySQL Database. A Data processing is required to apply a function to these Data through a Spark cluster. Last thing, this Data processing requires both Data from MySQL and the other data source. These 2 Data sources only have a common ID namely : user_id.

In order to complicate once again the scenario (but which might be some typical use cases you can face in your Data job), this external Data Sources can be fetched only via an API giving a JSON response.

Usually, any API retrieving customer Data may look like more or less as follows :

http://api.domain.com/contact-info/full-list

To keep it simple and readable, let’s assume that we will have the following response in a JSON format:

In the following python script, the processing will :

  1. Send the API Call
  2. Parse the JSON response from the pair extended-user-info
  3. Get the uid, which is the common ID between our Databases
  4. Check if this ID is present in our 1st dataframe
  5. Add a column named update to our 1st dataframe and store/update the value from the 2nd Database (from the API) user_info

 

The script has created a new column but ONLY the data from the 1st dataframe HAVING an corresponding data from the API response data has been added via the common ID. It should then output as shown here below:

Part 2 >>