Movielens Data Engineering :: Underlying Infrastructure Stack

By: Noelle Milton Vega

The underlying infrastructure stack that enable the data engineering illustrated in this Jupyter Notebook, as well as this notebook itself, is hosted on this personal GPU-equipped R&D server: https://jupyter.ai/RnDLab

The stack components are:

  • Two Linux/LXC containers running CentOS 7.x. Named vps00 and vps01, they run, among other things, the following:
    • vps00: Python-3 virtual environment (via Anaconda-3).
    • vps00: Docker CE Engine. (This means that Docker CE and its contaoners will run nested within this LXC container).
    • vps00: Cloudera/CDH 6.x daemons for 1 x MASTER and 1 x WORKER node (run natively, not within Docker)
    • vps00: MySQL 5.x Server Community Edition as the CDH 6.x Hive Metastore (run natively, not within Docker)
    • vps00: A three (qty. 3) node KAFKA cluster as Docker containers (via port 22181 to avoid conflict).
    • vps00: A one (qty. 1) node ZOOKEEPER cluster as a Docker container.
    • vps00: Apache Spark 2.x binary distribution. This is simply to leverage its config files. We use PySpark below.
    • vps01: Cloudera/CDH 6.x Manager, which points to Cloudera/CDH 6.x agents running on vps00
  • NOTES:
    • CDH's Kafka, Zookeeper and Spark clusters are unused because their versions are old. Only HDFS and Hive components are used.
    • Notable Conda and PIP Python packages in the aforementioned Python-3 virtual environment include:
      • PySpark 2.x
      • Confluent Kafka 1.x
      • JupyterLab 1.x
In [1]:
# ====================================================================================================
# Set the operational context of this notebook ...
# ====================================================================================================
import os, sys

JAVA_HOME = '/usr/java/current.d/'      # JAVA HOME. This is a symlink to '/usr/lib/jvm/java-openjdk/'.
os.environ['JAVA_HOME'] = JAVA_HOME     # Set JAVA_HOME environment variable.
os.chdir('/home/nmvega/agile.ds.d/')    # Change to project home directory.

!echo -e "HOST: $(uname -n)\n"                 # Ensure we're on vps00; our LXC, Docker and CDH development host.
!{JAVA_HOME}/bin/java -version                 # Display version in JAVA_HOME.
print('\nPYTHON (VENV):', sys.executable,'\n') # Verify Python Executable is our 'pyvenv.d' Virtual Env.
!pwd; ls -l                                    # Verify Current Working Directory, and display it's contents.
HOST: vps00

openjdk version "1.8.0_232"
OpenJDK Runtime Environment (build 1.8.0_232-b09)
OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)

PYTHON (VENV): /home/nmvega/agile.ds.d/stack.d/pyvenv.d/bin/python 

/home/nmvega/agile.ds.d
total 40
-rw-r--r--  1 nmvega nmvega  856 May 23 22:37 app.py
-rw-r--r--  1 nmvega nmvega    0 Feb 27  2019 derby.log
drwxr-xr-x  3 nmvega nmvega 4096 Jan 31  2019 metastore_db
-rw-r--r--  1 nmvega nmvega 4408 Jan 29  2019 NMVEGA.KAFKA.GITHUB.INSTRUCTIONS.README
-rw-r--r--  1 nmvega nmvega 2309 Feb 11  2019 notes.txt
drwxr-xr-x  7 nmvega nmvega 4096 Apr 21  2019 project
-rw-r--r--  1 nmvega nmvega 1378 Jan 29  2019 requirements.txt
drwxr-xr-x 10 nmvega nmvega 4096 Aug  4 01:46 stack.d
-rw-r--r--  1 nmvega nmvega  667 Feb 11  2019 TODO.txt
-rw-r--r--  1 nmvega nmvega  699 Feb  9  2019 venv.context.sh
In [35]:
# ====================================================================================================
# Confirm that Kafka (qty. 3) and Zookeeper (qty. 1) Docker containers are running.
# If not, then issue the following command:
# ====================================================================================================
# ! (cd ~/agile.ds.d/stack.d; docker-compose up -d zookeeper kafka; docker-compose scale kafka=3)
# ====================================================================================================
! docker ps
CONTAINER ID        IMAGE                           COMMAND                  CREATED             STATUS              PORTS                                                 NAMES
b0ac08727ed4        nmvega/kafka:latest             "start-kafka.sh"         15 seconds ago      Up 15 seconds       0.0.0.0:9094->9092/tcp                                stackd_kafka_3
6400c2985c99        nmvega/kafka:latest             "start-kafka.sh"         15 seconds ago      Up 15 seconds       0.0.0.0:9093->9092/tcp                                stackd_kafka_2
9d53ed373c53        nmvega/kafka:latest             "start-kafka.sh"         16 seconds ago      Up 16 seconds       0.0.0.0:9092->9092/tcp                                stackd_kafka_1
1a55b738deb5        wurstmeister/zookeeper:latest   "/bin/sh -c '/usr/sb…"   17 seconds ago      Up 16 seconds       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:22181->2181/tcp   analytics-ZooKeeper
In [36]:
# ----------------------------------------------------------------------------------
# ZooKeeper via CDH6.x at vps00:2181  (Used by CDH/Hadoop services)
# ZooKeeper via Docker at vps00:22181 (Used by Docker supplied Kafka (not via CDH6.x))
# Kafka Brokers via Docker at vps00:909[2-4] (Used by Python supplied PySpark 2.x (not via CDH6.x))
# ----------------------------------------------------------------------------------
!netstat -an | egrep '2181|909[2-4]' | grep LISTEN
tcp        0      0 0.0.0.0:2181            0.0.0.0:*               LISTEN     
tcp6       0      0 :::9092                 :::*                    LISTEN     
tcp6       0      0 :::9093                 :::*                    LISTEN     
tcp6       0      0 :::22181                :::*                    LISTEN     
tcp6       0      0 :::9094                 :::*                    LISTEN     
In [3]:
# =================================================================================================================
# Note: ${HOME}/.mylogin.cnf has an entry named 'vps00' that points to login-path: (vps00:3306, nmvega, <password>)
# =================================================================================================================
# =================================================================================================================
# -- Drop 'movielens' database in MySQL in case it exists.
# -- SHOW DATABASES in MySQL to illustrate that 'movielens' doesn't exist.
# =================================================================================================================
!mysql --login-path=vps00 -e 'DROP DATABASE IF EXISTS movielens'
!mysql --login-path=vps00 -e 'SHOW DATABASES'
# =================================================================================================================
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| sakila             |
+--------------------+
In [4]:
# ===============================================================================================
# Data: MovieLens data normalized into various SQL db schemas: https://github.com/ankane/movielens.sql
# wget https://raw.githubusercontent.com/ankane/movielens.sql/master/movielens.sql
# NOTE: UTF8 post-processing was necessary due to this issue: https://bugs.mysql.com/bug.php?id=73242
# ===============================================================================================
!mysql --login-path=vps00 -e 'CREATE DATABASE IF NOT EXISTS movielens' # Create & Populate 'movielens' db.
!mysql --login-path=vps00 movielens < /home/nmvega/agile.ds.d/stack.d/scripts.d/movielens_mysql-utf8mb4.sql
!mysql --login-path=vps00 -e 'SHOW DATABASES'
table_list = !mysql --login-path=vps00 -N movielens -e "show tables;" # Save list of tables.

for table in table_list: # Show first few Rows for each table in MovieLens.
    print(table)
    !mysql --login-path=vps00 movielens -e "SELECT * FROM {table} LIMIT 3"
# ===============================================================================================
+--------------------+
| Database           |
+--------------------+
| information_schema |
| movielens          |
| mysql              |
| performance_schema |
| sakila             |
+--------------------+
genres
+----+-----------+
| id | name      |
+----+-----------+
|  1 | Action    |
|  2 | Adventure |
|  3 | Animation |
+----+-----------+
genres_movies
+----+----------+----------+
| id | movie_id | genre_id |
+----+----------+----------+
|  1 |        1 |        3 |
|  2 |        1 |        4 |
|  3 |        1 |        5 |
+----+----------+----------+
movies
+----+-------------------+--------------+
| id | title             | release_date |
+----+-------------------+--------------+
|  1 | Toy Story (1995)  | 1995-01-01   |
|  2 | GoldenEye (1995)  | 1995-01-01   |
|  3 | Four Rooms (1995) | 1995-01-01   |
+----+-------------------+--------------+
occupations
+----+---------------+
| id | name          |
+----+---------------+
|  1 | Administrator |
|  2 | Artist        |
|  3 | Doctor        |
+----+---------------+
ratings
+----+---------+----------+--------+---------------------+
| id | user_id | movie_id | rating | rated_at            |
+----+---------+----------+--------+---------------------+
|  1 |     196 |      242 |      3 | 1997-12-04 07:55:49 |
|  2 |     186 |      302 |      3 | 1998-04-04 11:22:22 |
|  3 |      22 |      377 |      1 | 1997-11-06 23:18:36 |
+----+---------+----------+--------+---------------------+
users
+----+------+--------+---------------+----------+
| id | age  | gender | occupation_id | zip_code |
+----+------+--------+---------------+----------+
|  1 |   24 | M      |            20 | 85711    |
|  2 |   53 | F      |            14 | 94043    |
|  3 |   23 | M      |            21 | 32067    |
+----+------+--------+---------------+----------+
Initialize the pyvenv-based PySpark-2.4 (created using Anaconda3 Python-3 -- .../stack.d/pyvenv.d).
In [5]:
import sys, os
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, Catalog
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.sql import functions as F # Spark Functions. USAGE: F.col(), F.countDistinct()
from pyspark.sql import types as T     # Spark Types.     USAGE: T.Integer()
from pyspark.sql.types import Row
  # ------------------------------------------
  # Note: Row() in .../pyspark/sql/types.py
  # isn't included in '__all__' list(), so
  # we must import it by name here.
  # ------------------------------------------

PROJECT_BASE_DIR        = '/home/nmvega/agile.ds.d/'
SPARK_WAREHOUSE_DIR     = PROJECT_BASE_DIR + 'stack.d/spark.d/spark-warehouse'
SPARK_CONF_DIR          = PROJECT_BASE_DIR + 'stack.d/spark.d/SPARK.CONF.DIR.d'
SPARK_JARS_IVY          = PROJECT_BASE_DIR + 'stack.d/spark.d/SPARK_JARS_IVY.d' # Cache downloaded JARS here.
SPARK_JARS_REPOSITORIES = ','.join([ 'http://repo.hortonworks.com/content/repositories/releases', ])

os.environ.pop('SPARK_MASTER_HOST', None)      # Since we're using pip/pySpark, these next three environment
os.environ.pop('SPARK_MASTER_POST', None)      # variables aren't needed; and we ensure pySpark doesn't
os.environ.pop('SPARK_HOME', None)             # get confused by them, should they be set.
os.environ.pop('PYTHONSTARTUP', None)          # Just in case pySpark 2.x attempts to read this.
os.environ['PYSPARK_PYTHON'] = sys.executable  # Make SPARK Workers/Executors use same Python as Master.
os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre'   # Oracle JAVA for our pip/python3/pySpark 2.4 (not CDH's JRE).

os.environ['SPARK_CONF_DIR'] = SPARK_CONF_DIR  # Where spark-defaults.conf; spark-env.sh; log4j; etc. go.
os.environ['HADOOP_CONF_DIR'] = SPARK_CONF_DIR # Where symlinks to CDH's core|hdfs|hive|yarn-site.xml should
                                               # be created. (We use same directory as above). Both of these
                                               # locations can also be set in '(SPARK_CONF_DIR)/spark-env.sh'.

# ===============================================================================================
# Append CDH's Hadoop Native Library directory to LD_LIBRARY_PATH so
# Spark/PySpark can find them when interacting with HIVE/HADOOP/HDFS.
# Spark emits a warning when this path is absent, but it doesn't fail.
# ===============================================================================================
HADOOP_NATIVE_LIBS = '/opt/cloudera/parcels/CDH/lib/hadoop/lib/native/'
os.environ['LD_LIBRARY_PATH'] = os.environ.get('LD_LIBRARY_PATH', '') + ':' + HADOOP_NATIVE_LIBS
# ===============================================================================================

 
# ===============================================================================================
SPARK_JARS_PACKAGES = ','.join([ 'org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0',
                                 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0',
                                 'org.apache.spark:spark-avro_2.11:2.4.0', ])
# ===============================================================================================
spark_conf = SparkConf()
spark_conf.setAll([
    ('spark.master', 'local[*]'),
    ('spark.app.name', 'myApp'),
    ('spark.submit.deployMode', 'client'),
    ('spark.ui.showConsoleProgress', 'true'),
    ('spark.eventLog.enabled', 'false'),
    ('spark.logConf', 'false'),
    ('spark.driver.bindAddress', '0.0.0.0'),
    ('spark.driver.host', '0.0.0.0'),

    ('spark.jars.repositories', SPARK_JARS_REPOSITORIES),
    ('spark.jars.ivy', SPARK_JARS_IVY),
    ('spark.jars.packages', SPARK_JARS_PACKAGES),

    ('spark.sql.warehouse.dir', SPARK_WAREHOUSE_DIR),
    ('spark.sql.catalogImplementation', 'hive'),
    ('spark.sql.hive.metastore.version', '1.2.1'),

    # [core|hive|hdfs|yarn]-site.xml K/V pairs can be converted into equivalent SparkConf()
    # K/V pairs by prefixing their Keys w/ 'spark.hadoop.'. Also, symlinks to those CDH
    # files can be created in SPARK_CONF_DIR for Spark clients to use as (overrridable) defaults.
    ('spark.hadoop.fs.defaultFS', 'hdfs://vps00:8020'),                    # From: 'core-site.xml'
    ('spark.hadoop.hive.metastore.uris', 'thrift://vps00:9083'),           # From: 'hive-site.xml'
    ('spark.hadoop.hive.metastore.warehouse.dir', '/user/hive/warehouse'), # From: 'hive-site.xml'
])
 
spark_sesn          = SparkSession.builder.config(conf=spark_conf).enableHiveSupport().getOrCreate()
spark_ctxt          = spark_sesn.sparkContext
spark_reader        = spark_sesn.read
spark_streamReader  = spark_sesn.readStream
spark_ctxt.setLogLevel("WARN")
 
# ===============================================================================================
# Create small arbitrary DataFrame (myDF) and GroupedData (myGDF) instances so we can inspect
# methods() and attributes inside them. Also, register table name for SparkSQL queries.
# ===============================================================================================
myDF  = spark_sesn.createDataFrame([Row(0,1,2), Row(3,1,5), Row(6,1,8)]).toDF('col0', 'col1', 'col2')
myGDF = myDF.select('*').groupBy('col1')
myDF.createOrReplaceTempView('mydf_as_sqltable')
# ===============================================================================================
print(myDF.collect())
myGDF.sum().show()

# ===============================================================================================
#spark_sens.stop()
# ===============================================================================================
[Row(col0=0, col1=1, col2=2), Row(col0=3, col1=1, col2=5), Row(col0=6, col1=1, col2=8)]
+----+---------+---------+---------+
|col1|sum(col0)|sum(col1)|sum(col2)|
+----+---------+---------+---------+
|   1|        9|        3|       15|
+----+---------+---------+---------+

Import MovieLens DB to Hive DW under /user/hive/warehouse/movielens, serialized as Parquet files ...

In [31]:
# SQL to HADOOP (sqoop) the MovieLens tables from MySQL to HADOOP/HIVE and serialize them as Parquet files.
# Because we specify '--hive-overwrite' below, this operation is IDEMPOTENT (though M/R does takes time to
# complete).
#
# Notes:
#  1) I had to manually install 'mysql-connector-java-5.1.*.jar' into vps00:/var/lib/sqoop and /usr/share/java/.
#     https://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_sqoop1_client.html#topic_13_7
#     (Jar file was obtained from MySQL/Oracle downloads page).
#     -----------------------------------------------------------------------------------------------------
#     Alternatively, issuing 'sudo dnf install mysql-connector-java.noarch' and creating symlinks to it in the
#     aforementioned directories is possible, but that repo version was older.
#     -----------------------------------------------------------------------------------------------------
#     This step was actually performed during my installation of CDH6.x so that its install wizard (for
#     creating Hive, Hue, Oozie, Activity-Monitor MySQl databases) could succeed in connecting to MySQL.
#     -----------------------------------------------------------------------------------------------------
#  2) [email protected]$ sudo -u hdfs hdfs dfs -mkdir /user/nmvega
#  3) [email protected]$ sudo -u hdfs hdfs dfs -chown nmvega:nmvega /user/nmvega
# ================================================================================================================ 

# ================================================================================================================ 
# Pre-Sqoop cleanup steps:
# ================================================================================================================ 
# -- Drop 'movielens' database HIVE MetaStore in case it exists.
# -- Also remove its physical HIVE database table files: hdfs://vps00:8020/user/hive/warehouse/movielens.db/
# -- Delete table-artifacts in UNIX and HDFS homedirs from previous sqoop runs.
# -- Issue 'SHOW DATABASES' in HIVE to illustrate that 'movielens' doesn't exist.
# ================================================================================================================ 
!/usr/bin/beeline -n nmvega -u jdbc:hive2://vps00:10000 -e 'DROP DATABASE IF EXISTS movielens CASCADE;' \
            2>&1 | egrep '^\+|^\-|^\|'
!hdfs dfs -rm -R hdfs://vps00:8020/user/hive/warehouse/movielens.db # May already be deleted by DROP cmd above.
#
for t in table_list:
    !rm -f /home/nmvega/agile.ds.d/codegen_{t}.java # Remove previous sqoop table artifacts from our UNIX homedir.
    !hdfs dfs -rm -R -skipTrash /user/nmvega/{t}    # Remove previous sqoop table artifacts from our HDFS homedir.
#
!/usr/bin/beeline -n nmvega --table -u jdbc:hive2://vps00:10000 -e 'show databases' 2>&1 | egrep '^\+|^\-|^\|'
# ================================================================================================================ 
# ================================================================================================================ 
# Sqoop ...
# ================================================================================================================ 
print('SQOOP job start ...')

!unset JAVA_HOME HADOOP_HOME; \
 /usr/bin/beeline -n nmvega -u "jdbc:hive2://vps00:10000" -e "CREATE DATABASE IF NOT EXISTS movielens"; \
 /usr/bin/sqoop import-all-tables \
    -m 1 \
    --connect jdbc:mysql://vps00:3306/movielens \
    --username="nmvega" \
    --password="mysql4M3!" \
    --hive-import \
    --hive-overwrite \
    --hive-database movielens \
    --as-parquetfile \
    --compression-codec=snappy \
    --username=nmvega \
    --parquet-configurator-implementation hadoop >/dev/null 2>&1

!rm -f /home/nmvega/agile.ds.d/codegen_*.java # Remove sqoop codegen artifacts from our UNIX homedir.
print('SQOOP job end.')
rm: `hdfs://vps00:8020/user/hive/warehouse/movielens.db': No such file or directory
rm: `/user/nmvega/genres': No such file or directory
rm: `/user/nmvega/genres_movies': No such file or directory
rm: `/user/nmvega/movies': No such file or directory
rm: `/user/nmvega/occupations': No such file or directory
rm: `/user/nmvega/ratings': No such file or directory
rm: `/user/nmvega/users': No such file or directory
+----------------+
| database_name  |
+----------------+
| default        |
+----------------+
SQOOP job start ...
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Connecting to jdbc:hive2://vps00:10000
Connected to: Apache Hive (version 2.1.1-cdh6.3.0)
Driver: Hive JDBC (version 2.1.1-cdh6.3.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
INFO  : Compiling command(queryId=hive_20191031205913_5fd00bd5-ee16-4d3f-b2db-3608cbaaaa1b): CREATE DATABASE IF NOT EXISTS movielens
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=hive_20191031205913_5fd00bd5-ee16-4d3f-b2db-3608cbaaaa1b); Time taken: 0.135 seconds
INFO  : Executing command(queryId=hive_20191031205913_5fd00bd5-ee16-4d3f-b2db-3608cbaaaa1b): CREATE DATABASE IF NOT EXISTS movielens
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20191031205913_5fd00bd5-ee16-4d3f-b2db-3608cbaaaa1b); Time taken: 0.03 seconds
INFO  : OK
No rows affected (0.204 seconds)
Beeline version 2.1.1-cdh6.3.0 by Apache Hive
SQOOP job end.
In [32]:
# --------------------------------------------------------------------------------------------------------------
# After running the above sqoop(1) command, we issue three commands to show that we end up with:
# --------------------------------------------------------------------------------------------------------------
# 1) A HIVE database/schema named 'movielens',
# 2) Six (qty. 6) tables in the HIVE 'movielens' database/schema, and
# 3) Files in HDFS associated with said HIVE tables. Parquet format were specified this time, but could be Avro.
# --------------------------------------------------------------------------------------------------------------
# Note: The 2nd and 3rd commands will emit no output if the above sqoop(1) session wasn't run.
# --------------------------------------------------------------------------------------------------------------
!/usr/bin/beeline --table -n mvega -u jdbc:hive2://vps00:10000 -e 'show databases' 2>&1 | egrep "^\+|^\-|^\|"
!/usr/bin/beeline --table -n mvega -u jdbc:hive2://vps00:10000/movielens -e 'show tables' 2>&1 | egrep "^\+|^\-|^\|"
!hdfs dfs -ls -R /user/hive/warehouse | awk "/.parquet$/ {print \$3, \$4, \$8}"
+----------------+
| database_name  |
+----------------+
| default        |
| movielens      |
+----------------+
+----------------+
|    tab_name    |
+----------------+
| genres         |
| genres_movies  |
| movies         |
| occupations    |
| ratings        |
| users          |
+----------------+
nmvega nmvega /user/hive/warehouse/movielens.db/genres/part-m-00000.snappy.parquet
nmvega nmvega /user/hive/warehouse/movielens.db/genres_movies/part-m-00000.snappy.parquet
nmvega nmvega /user/hive/warehouse/movielens.db/movies/part-m-00000.snappy.parquet
nmvega nmvega /user/hive/warehouse/movielens.db/occupations/part-m-00000.snappy.parquet
nmvega nmvega /user/hive/warehouse/movielens.db/ratings/part-m-00000.snappy.parquet
nmvega nmvega /user/hive/warehouse/movielens.db/users/part-m-00000.snappy.parquet
In [8]:
# --------------------------------------------------------------------------------------------------------------
# Next, we illustrate the same as the above, but via Spark/SparkSQL.
# Note: To enable this, The CDH [core|hadoop|hive]-site.xml are symlinked to in ${SPARK_CONF_DIR};
#       and/or their equivalent K/V pairs are set in SparkConf (above). 
# --------------------------------------------------------------------------------------------------------------
(db,table) = ('movielens','genres') 
# --------------------------------------------------------------------------------------------------------------
spark_sesn.sql("SHOW DATABASES").show() # List all HIVE DBs. Among others, 'movielens' will be shown.
# --------------------------------------------------------------------------------------------------------------
spark_sesn.sql("SHOW TABLES").show()    # No DB is specified here, so Spark only shows TEMP TABLES/VIEWS
                                        # created/registered via myDF.createOrReplaceTempView().
# --------------------------------------------------------------------------------------------------------------
spark_sesn.sql("SHOW TABLES IN " + db).show() # When a DB name is specified, Spark looks in the HIVE metastore
                                              # (i.e. in MySQL) for it. It displays what it finds for the
                                              # specified DB -plus- TEMP TABLE/VIEWS.
# --------------------------------------------------------------------------------------------------------------
spark_sesn.sql("SELECT * FROM %s.%s LIMIT 5" % (db,table)).show()
                                              # Issue a query to HIVE DB tables. Again, specifying
                                              # a database name triggers a HIVE MetaStore lookup.
# --------------------------------------------------------------------------------------------------------------
del (db,table)
+------------+
|databaseName|
+------------+
|     default|
|   movielens|
+------------+

+--------+----------------+-----------+
|database|       tableName|isTemporary|
+--------+----------------+-----------+
|        |mydf_as_sqltable|       true|
+--------+----------------+-----------+

+---------+----------------+-----------+
| database|       tableName|isTemporary|
+---------+----------------+-----------+
|movielens|          genres|      false|
|movielens|   genres_movies|      false|
|movielens|          movies|      false|
|movielens|     occupations|      false|
|movielens|         ratings|      false|
|movielens|           users|      false|
|         |mydf_as_sqltable|       true|
+---------+----------------+-----------+

+---+----------+
| id|      name|
+---+----------+
|  1|    Action|
|  2| Adventure|
|  3| Animation|
|  4|Children's|
|  5|    Comedy|
+---+----------+

In [9]:
# =================================================================================================================
# Save the base URI for the MovieLens DB in the HIVE DataWarehouse for later use ...
# =================================================================================================================
db_name = 'movielens.db'
dw_base_URI =  spark_conf.get('spark.hadoop.fs.defaultFS')                 # Append: hdfs://vps00:8020
dw_base_URI += spark_conf.get('spark.hadoop.hive.metastore.warehouse.dir') # Append: /user/hive/warehouse
dw_base_URI += '/' + db_name + '/'                                         # Append: /movielens.db/
dw_base_URI  # hdfs://vps00:8020/user/hive/warehouse/movielens.db/
# =================================================================================================================
Out[9]:
'hdfs://vps00:8020/user/hive/warehouse/movielens.db/'
Using pySpark 2.4 (via our conda virtual environment), read the Parquet files from Hive/HDFS (on vps00) that represent MovieLens database tables. This PySpark is external to the Cloudera/CDH6.x cluster, acting essentially as a single-node Standalone cluster (though spread across many CPU cores).
In [10]:
# READ:  DataFrameReader.format(...).option("k","v").schema(...).load()
# WRITE: DataFrameWriter.format(...).option("k","v").partitionBy(...).bucketBy(...).sortBy(...).save()

# =====================================================================================================
# Read each table's Parquet files in HDFS/HIVE into Spark respective DataFrames.
# Note: .read(), .format(), .option() and .schema() all return a DataFrameReader() object.
#       .load() finally returns a DataFrame object. (Note: By default, schema inference is used below).
#       If many options are needed, use a map/dict() instead: .option( {k1:v1, k2:v2, ...} )
# =====================================================================================================
genres_movies_df = spark_sesn.read.format('parquet').option('path', dw_base_URI + 'genres_movies/*.parquet').load()
occupations_df   = spark_sesn.read.format('parquet').option('path', dw_base_URI + 'occupations/*.parquet').load()
ratings_df       = spark_sesn.read.format('parquet').option('path', dw_base_URI + 'ratings/*.parquet').load()
genres_df        = spark_sesn.read.format('parquet').option('path', dw_base_URI + 'genres/*.parquet').load()
movies_df        = spark_sesn.read.format('parquet').option('path', dw_base_URI + 'movies/*.parquet').load()
users_df         = spark_sesn.read.format('parquet').option('path', dw_base_URI + 'users/*.parquet').load()
#
df_dict = { df: globals()[df] for df in ['genres_movies_df', 'occupations_df', 'ratings_df',
                                         'genres_df', 'movies_df', 'users_df'] } # Just for fun. =:)

for (df_name,df) in df_dict.items(): # See page-183 of Spark Definitive Guide book.
    if '_corrupt_record' in df.columns:
        print('%s: Has corrupt records!' % df_name)
# =====================================================================================================
# Note: The above DataFrames have no content in them that explicitly coorelate them to the 'movielens'
# table that they correspond to. We only know this from identifier names that we assigned to them, but
# those won't help us when, say, we pass those objects around to methods/functions. This is where
# DataFrame column meta-data can help us, which we apply in the next cell.
# =====================================================================================================
genres_df.show(4, False)
df_dict['genres_df'].show(4, False)
# =====================================================================================================
+---+----------+
|id |name      |
+---+----------+
|1  |Action    |
|2  |Adventure |
|3  |Animation |
|4  |Children's|
+---+----------+
only showing top 4 rows

+---+----------+
|id |name      |
+---+----------+
|1  |Action    |
|2  |Adventure |
|3  |Animation |
|4  |Children's|
+---+----------+
only showing top 4 rows

In [11]:
# =====================================================================================================
# This step adds metadata to the first (left-most) column, which for each of these happens to be
# column-name 'id'. The metadata we're adding is the 'movielens' table-name, since the source parquet
# files do not have this information embedded in them.
# =====================================================================================================
# There are two ways to do this (both equivalent in result):
# =====================================================================================================
# 1) Using df.select(): This is longer (imperative), but more obvious as to what is being done.
# 
# genres_movies_df = genres_movies_df.select(
#                         genres_movies_df.id.alias('id', metadata = {'table':'genres_movies'}),
#                        *(genres_movies_df.columns[1:]) )
# ----------------------------------------------------------------------------------------------
# 2) Using df.withColumn(): This is shorter (declarative), and takes advantage of Spark letting you substitute
#                           a column for itself, while also returning the others (unmodified), with no need
#                           to explicifly specify them (and, in fact, with df.withColumn() you cannot).
#
#ratings_df       = ratings_df.withColumn('id', ratings_df.id.alias(None, metadata = {'table':'ratings'}))
#genres_df        = genres_df.withColumn('id', genres_df.id.alias(None, metadata = {'table':'genres'}))
#movies_df        = movies_df.withColumn('id', movies_df.id.alias(None, metadata = {'table':'movies'}))
#users_df         = users_df.withColumn('id', users_df.id.alias(None, metadata = {'table':'users'}))
#genres_movies_df = (same pattern as above)
#occupations_df   = (same pattern as above)
# ----------------------------------------------------------------------------------------------
# The following code programmatically implements the above six statements ...
# ----------------------------------------------------------------------------------------------
df_dict = {}
tables = ['genres_movies', 'occupations', 'ratings', 'genres', 'movies', 'users']
for table in tables:
    df = globals()[table + '_df'] # Recover DF from its identifier: E.g. movies_df, genres_movies_df, ...
    colName = df.columns[0]       # Get name of left-most/1st col. That's where we'll attach metadata to each DF.
    df = df.withColumn(colName, df[colName].alias(None, metadata = {'table':table}))
    df_dict[table] = df
# ----------------------------------------------------------------------------------------------
del (df,)
print('DONE!')
# =====================================================================================================
DONE!
In [12]:
# =====================================================================================
# Sample our data and it's included metadata embedded in the 'id' column (added above) ...
# =====================================================================================
for t in tables:
    print('df_dict[%s] first-column METADATA:' % t,
           df_dict[t].schema[df_dict[t].columns[0]].metadata)
print()
for table,df in df_dict.items():
    print(table)
    df.show(3)
# =====================================================================================
df_dict[genres_movies] first-column METADATA: {'table': 'genres_movies'}
df_dict[occupations] first-column METADATA: {'table': 'occupations'}
df_dict[ratings] first-column METADATA: {'table': 'ratings'}
df_dict[genres] first-column METADATA: {'table': 'genres'}
df_dict[movies] first-column METADATA: {'table': 'movies'}
df_dict[users] first-column METADATA: {'table': 'users'}

genres_movies
+---+--------+--------+
| id|movie_id|genre_id|
+---+--------+--------+
|  1|       1|       3|
|  2|       1|       4|
|  3|       1|       5|
+---+--------+--------+
only showing top 3 rows

occupations
+---+-------------+
| id|         name|
+---+-------------+
|  1|Administrator|
|  2|       Artist|
|  3|       Doctor|
+---+-------------+
only showing top 3 rows

ratings
+---+-------+--------+------+------------+
| id|user_id|movie_id|rating|    rated_at|
+---+-------+--------+------+------------+
|  1|    196|     242|     3|881240149000|
|  2|    186|     302|     3|891706942000|
|  3|     22|     377|     1|878876316000|
+---+-------+--------+------+------------+
only showing top 3 rows

genres
+---+---------+
| id|     name|
+---+---------+
|  1|   Action|
|  2|Adventure|
|  3|Animation|
+---+---------+
only showing top 3 rows

movies
+---+-----------------+------------+
| id|            title|release_date|
+---+-----------------+------------+
|  1| Toy Story (1995)|788936400000|
|  2| GoldenEye (1995)|788936400000|
|  3|Four Rooms (1995)|788936400000|
+---+-----------------+------------+
only showing top 3 rows

users
+---+---+------+-------------+--------+
| id|age|gender|occupation_id|zip_code|
+---+---+------+-------------+--------+
|  1| 24|     M|           20|   85711|
|  2| 53|     F|           14|   94043|
|  3| 23|     M|           21|   32067|
+---+---+------+-------------+--------+
only showing top 3 rows

In [13]:
# =====================================================================================================
# Next, using data just loaded in Spark DataFrames, write data to HDFS ...
# =====================================================================================================
OUTPUT_HDFS_PATH = 'hdfs://vps00:8020/tmp/movielens' # Directory on HDFS (not UFS) on vps00.
genres_df.write.option('compression','snappy').mode('overwrite').format('parquet').save(OUTPUT_HDFS_PATH) # --OR--
#genres_df.write.option('compression','snappy').mode('overwrite').parquet(OUTPUT_HDFS_PATH)
# =====================================================================================================

# =====================================================================================================
# Show results. First line content came via SQOOP. Second line content came vis DF .save() above ...
# =====================================================================================================
!hdfs dfs -ls -R /user/hive/warehouse/movielens.db/genres/*.parquet | awk "/.parquet$/ {print \$3, \$5, \$8}"
!hdfs dfs -ls -R /tmp/movielens | awk "/.parquet$/ {print \$3, \$5, \$8}"
# =====================================================================================================
nmvega 946 /user/hive/warehouse/movielens.db/genres/part-m-00000.snappy.parquet
nmvega 843 /tmp/movielens/part-00000-50e55452-9761-4d59-b7f3-8e2496ffccd3-c000.snappy.parquet
In [14]:
# =====================================================================================================
# The outputs above show that the SOURCE and DEST parquet files in HDFS are not the same size.
# This is likely due to additional snappy compression by Spark when writing to DEST.
# However, we can confirm no data-corruption has occurred by comparing the HASH of each one ...
# =====================================================================================================
SRC = !hdfs dfs -ls -C /user/hive/warehouse/movielens.db/genres/*.parquet # Returns a list()
DST = !hdfs dfs -ls -C /tmp/movielens/*.parquet # Returns a list().
SRC = 'hdfs://' + SRC[0] # http:///user/hive/warehouse/movielens.db/genres/<SrcFileName>.parquet
DST = 'hdfs://' + DST[0] # http:///tmp/movielens/<DstFileName>.parquet
# -----------------------------
parquet_tools = 'hadoop jar /opt/cloudera/parcels/CDH/jars/parquet-tools-*.jar'
!{parquet_tools} cat {SRC} 2> /dev/null | md5sum # Should give same output as next statement.
!{parquet_tools} cat {DST} 2> /dev/null | md5sum # Should give same output as prev statement.
# =====================================================================================================
print('DONE!')
9cfa87aae92297340ddced73088c3efa  -
9cfa87aae92297340ddced73088c3efa  -
DONE!
In [15]:
# =====================================================================================================
# ANALYTIC: For each Year/Month-PAIR, print count of movies having rating >= 4.
# =====================================================================================================
# Ratings table: A few rows for visual reference below transformations ...
# =====================================================================================================
#   +---+-------+--------+------+------------+
#   | id|user_id|movie_id|rating|    rated_at|
#   +---+-------+--------+------+------------+
#   |  1|    196|     242|     3|881222149000| <--- EPOCH time format (a.k.a. UNIX TIME format).
#   |  2|    186|     302|     3|891688942000|
#   |  3|     22|     377|     1|878858316000|
#   +---+-------+--------+------+------------+
# =====================================================================================================

df = df_dict['ratings'] # Retrieve the 'ratings' table DataFrame.
# =====================================================================================================
# The following forms a column expression that converts EPOCH-time to 'TimeStamp' format.
#   -- E.g. 881250949000 --TO--> 1997-12-04 07:55:49 (F.from_unixtime())
#   -- The 1,000 in the expression converts from miliseconds to seconds.
# =====================================================================================================
dateTime_col = F.from_unixtime((df['rated_at'].cast('BIGINT')/1000)).cast('TIMESTAMP').alias('dateTime')
# =====================================================================================================
df = df.select(dateTime_col, 'rating') # Add our dateTime column and filter-out superfluous COLUMNS.
df = df.where('rating >= 4')           # Filter-out superfluous ROWS. Both are filter-first a best practices.
                                       # We could chain these, but letting Catalyst do it makes this more readable. :)
# =====================================================================================================
gdf = df.groupBy(F.year('dateTime').alias('year'),
                 F.month('dateTime').alias('month'))    # Group by Year/Month pair (window) so we can count.
                                                        # F.year() extracts the YEAR from a TIMESTAMP.
                                                        # F.month() extracts the MONTH from a TIMESTAMP.

df = gdf.count().withColumnRenamed('count', '4+_count') # .count() within Year-Month groups/buckets.
df = df.sort(F.asc('year'), F.asc('month'))             # NOTE: .orderBy() is an alias for .sort()
df.show()
# =====================================================================================================
del (df, gdf)
+----+-----+--------+
|year|month|4+_count|
+----+-----+--------+
|1997|    9|    3832|
|1997|   10|    5979|
|1997|   11|   13623|
|1997|   12|    6686|
|1998|    1|    7299|
|1998|    2|    5757|
|1998|    3|    7232|
|1998|    4|    4967|
+----+-----+--------+

In [16]:
# =====================================================================================================
# ANALYTIC: For each Year/Month-PAIR, print count of movies having rating >= 4.
# This is the same ANALYTIC as above, but implemented:
#    1) Via MySQL canonical source.
#    2) Via SparkSQL using a registered TEMPORARY VIEW.
#    3) Via SparkSQL using the HIVE 'movielens' database (and meta-store).
# 
# Note: The MySQL way, being our canonical data-store, has the benefit of also validating:
#    A) The DataFrame query results performed just immediately above,
#    B) The Sqoop/HDFS/HIVE/MetaStore import results earlier,
#    C) The SparkSQL against TEMP-VIEW results (performed below).
#    D) The SparkSQL against HIVE results (performed below).
# =====================================================================================================

# =====================================================================================================
# 1 of 3) Via MySQL canonical source ...
# =====================================================================================================
!mysql --login-path=vps00 --table movielens -e \
                            'SELECT YEAR(rated_at) AS year, \
                                    MONTH(rated_at) AS month, \
                                    COUNT(*) AS count \
                             FROM ratings \
                             WHERE rating >= 4 \
                             GROUP BY YEAR(rated_at), MONTH(rated_at) \
                             ORDER BY YEAR(rated_at) ASC, MONTH(rated_at) ASC'
# =====================================================================================================
+------+-------+-------+
| year | month | count |
+------+-------+-------+
| 1997 |     9 |  3832 |
| 1997 |    10 |  5979 |
| 1997 |    11 | 13623 |
| 1997 |    12 |  6686 |
| 1998 |     1 |  7299 |
| 1998 |     2 |  5757 |
| 1998 |     3 |  7232 |
| 1998 |     4 |  4967 |
+------+-------+-------+
In [17]:
# =====================================================================================================
# 2 of 3) Via SparkSQL using a registered TEMPORARY VIEW (df.createOrReplaceTempView())...
# =====================================================================================================
df = df_dict['ratings'] # Recover 'ratings' DF from its dict() key.
dateTime_col = F.from_unixtime((df['rated_at'].cast('BIGINT')/1000)).cast('TIMESTAMP')
df = df.withColumn('dateTime', dateTime_col) # EPOC time to TIMESTAMP. We could've dropped 'rated_at' here.

temp_view_name = 'ratings'
df.createOrReplaceTempView(temp_view_name) # Register TEMP-VIEW in SparkSQL named: 'ratings'
df = spark_sesn.sql('''SELECT YEAR(dateTime) AS year,
                              MONTH(dateTime) AS month,
                              COUNT(*) AS count
                       FROM ratings
                       WHERE rating >= 4
                       GROUP BY year, month
                       ORDER BY year, month''')
df.show()
# =====================================================================================================
# NOTE: We can conveniently mix SparkSQL with DataFrame methods, and Catalyst will know what to do!
#       For example, omitting the final 'ORDER BY' clause above, combined with un-commenting the
#       following, results in the same answer: df = df.orderBy(F.asc('year'), F.asc('month')).
#       And actually, we chose to pre-pended the 'dateTime' column using a DF method above.
# =====================================================================================================
spark_sesn.catalog.dropTempView(temp_view_name)       # Drop a Temp-View. Safe to call even if non-existent.
spark_sesn.catalog.dropGlobalTempView(temp_view_name) # Drop a Global-Views. Safe to call even if non-existent.
del (df,)
# =====================================================================================================
+----+-----+-----+
|year|month|count|
+----+-----+-----+
|1997|    9| 3832|
|1997|   10| 5979|
|1997|   11|13623|
|1997|   12| 6686|
|1998|    1| 7299|
|1998|    2| 5757|
|1998|    3| 7232|
|1998|    4| 4967|
+----+-----+-----+

In [18]:
# =====================================================================================================
# 3 of 3) Via SparkSQL using the HIVE 'movielens' database (and meta-store).
# Query is sent to the HIVE DB via thrift API service at 'thrift://vps00:9083' ...
# =====================================================================================================
# Notice below that we employ neither DataFrames nor TEMP-VIEWS.
# Instead, we name a HIVE database and HIVE table within it via the 'FROM' clause ... 
# =====================================================================================================
# spark_sesn.sql('SELECT FROM_UNIXTIME(rated_at DIV 1000, "yyyy-MM-dd hh:mm:ss") \
#                AS dateTime FROM movielens.ratings LIMIT 3').show() # Example DateTime format string parts.

(hive_db, hive_db_table) = ('movielens','ratings')
spark_sesn.sql('''SELECT
                     FROM_UNIXTIME(rated_at DIV 1000, "yyyy") AS year,
                     FROM_UNIXTIME(rated_at DIV 1000, "MM") AS month,
                     COUNT(*) AS count
                  FROM %s.%s r
                  WHERE r.rating >= 4
                  GROUP BY year, month
                  ORDER BY year ASC, month ASC''' % (hive_db, hive_db_table)).show()
# =====================================================================================================
+----+-----+-----+
|year|month|count|
+----+-----+-----+
|1997|   09| 3832|
|1997|   10| 5979|
|1997|   11|13623|
|1997|   12| 6686|
|1998|   01| 7299|
|1998|   02| 5757|
|1998|   03| 7232|
|1998|   04| 4967|
+----+-----+-----+

In [19]:
# =====================================================================================================
# ANALYTIC: For each Year/Month-GROUPING, print number of movies having a genre-rating >= 4.
# DataFrame way ... 
# =====================================================================================================
df1 = df_dict['ratings']
df2 = df_dict['genres_movies']
df3 = df_dict['genres']
# =====================================================================================================
# Filter-IN ony the necessary Rows and Columns first (filter-first best practice) ...
# =====================================================================================================
df1 = df1.filter(df1.rating >= 4).select('rating', 'rated_at', 'movie_id')
df2 = df2.select('movie_id', 'genre_id')
df3 # Columns: (id, name). Nothing that can be filtered-in/out here. =:)
# =====================================================================================================

# =====================================================================================================
df = df1.join(df2, df1.movie_id == df2.movie_id, 'INNER') # Add columns from 'genres_movies'.
df = df.join(df3, df.genre_id == df3.id, 'INNER')         # Add columns from 'genres'.
#df.show(4, False)                                        # Annoyingly, these add duplicate columns, and
                                                          # while we can remove them, we ignore for now. :)
# =====================================================================================================

# =====================================================================================================
# Convert EPOC-time to TIMESTAMP; then further reduce selection of columns unneeded after join().
# =====================================================================================================
dateTime_col = F.from_unixtime((df['rated_at'].cast('BIGINT')/1000)).cast('TIMESTAMP').alias('dateTime')
# =====================================================================================================

# =====================================================================================================
# Group by Year, Month and Genre so we can count. We append 'rating' only so we don't lose that
# column after the .group(). There are other ways to handle this.
# =====================================================================================================
df = df.select('rating', 'name', dateTime_col)

df = df.groupBy(F.year('dateTime').alias('year'),
                F.month('dateTime').alias('month'),
                'name', 'rating').count() # Note: Before .count() we have a Grouped-DataFrame (gdf).

df.sort(F.asc('year'), F.asc('month'),
        F.asc('name'), F.asc('rating')).show(50, False)

# df.agg('ColExpr that specifies F.sum()|mean()|min()|max() etc. aggregation')
df.agg(F.sum('count').alias('4+_total_genre_ratings')).show() # B/c of a movie-to-genre(s) 1-to-N relationship,
                                                              # we expect this to be larger than 100,000 
print(df.count())                                             # movie ratings submitted.
# =====================================================================================================
del (df, df1, df2, df3)
+----+-----+-----------+------+-----+
|year|month|name       |rating|count|
+----+-----+-----------+------+-----+
|1997|9    |Action     |4     |685  |
|1997|9    |Action     |5     |353  |
|1997|9    |Adventure  |4     |361  |
|1997|9    |Adventure  |5     |211  |
|1997|9    |Animation  |4     |113  |
|1997|9    |Animation  |5     |63   |
|1997|9    |Children's |4     |190  |
|1997|9    |Children's |5     |73   |
|1997|9    |Comedy     |4     |760  |
|1997|9    |Comedy     |5     |366  |
|1997|9    |Crime      |4     |225  |
|1997|9    |Crime      |5     |156  |
|1997|9    |Documentary|4     |15   |
|1997|9    |Documentary|5     |13   |
|1997|9    |Drama      |4     |937  |
|1997|9    |Drama      |5     |619  |
|1997|9    |Fantasy    |4     |28   |
|1997|9    |Fantasy    |5     |11   |
|1997|9    |Film-Noir  |4     |51   |
|1997|9    |Film-Noir  |5     |25   |
|1997|9    |Horror     |4     |135  |
|1997|9    |Horror     |5     |59   |
|1997|9    |Musical    |4     |119  |
|1997|9    |Musical    |5     |60   |
|1997|9    |Mystery    |4     |105  |
|1997|9    |Mystery    |5     |59   |
|1997|9    |Romance    |4     |455  |
|1997|9    |Romance    |5     |257  |
|1997|9    |Sci-Fi     |4     |367  |
|1997|9    |Sci-Fi     |5     |218  |
|1997|9    |Thriller   |4     |577  |
|1997|9    |Thriller   |5     |325  |
|1997|9    |War        |4     |228  |
|1997|9    |War        |5     |182  |
|1997|9    |Western    |4     |45   |
|1997|9    |Western    |5     |20   |
|1997|10   |Action     |4     |903  |
|1997|10   |Action     |5     |563  |
|1997|10   |Adventure  |4     |481  |
|1997|10   |Adventure  |5     |348  |
|1997|10   |Animation  |4     |150  |
|1997|10   |Animation  |5     |128  |
|1997|10   |Children's |4     |264  |
|1997|10   |Children's |5     |153  |
|1997|10   |Comedy     |4     |1073 |
|1997|10   |Comedy     |5     |596  |
|1997|10   |Crime      |4     |311  |
|1997|10   |Crime      |5     |214  |
|1997|10   |Documentary|4     |27   |
|1997|10   |Documentary|5     |19   |
+----+-----+-----------+------+-----+
only showing top 50 rows

+----------------------+
|4+_total_genre_ratings|
+----------------------+
|                119131|
+----------------------+

288
In [20]:
# =====================================================================================================
# ANALYTIC: For each Year/Month-GROUPING, print number of movies having a genre-rating >= 4.
# MySQL way ... 
# =====================================================================================================
#    Again, validating against the canonical data-store (MySQL) not only validates the above results,
#    but also the end-to-end ETL processes, including SQOOP --> HIVE/HDFS, and HIVE/HDFS --> Spark DFs.
# =====================================================================================================
!mysql --login-path=vps00 --table movielens -e \
                         'DROP TEMPORARY TABLE IF EXISTS nmvega; \
                          CREATE TEMPORARY TABLE nmvega \
                          SELECT \
                               YEAR(rated_at) AS year, \
                               MONTH(rated_at) AS month, \
                               g.name, \
                               r.rating, \
                               COUNT(*) AS count \
                          FROM ratings r \
                              INNER JOIN genres_movies gm ON r.movie_id = gm.movie_id \
                              INNER JOIN genres g ON gm.genre_id = g.id \
                          WHERE rating >=4 \
                          GROUP BY year, month, g.name, r.rating \
                          ORDER BY year, month, g.name, r.rating; \
                                                                 \
                          SELECT * FROM nmvega LIMIT 10; \
                          SELECT SUM(count) FROM nmvega; \
                          SELECT COUNT(*) FROM nmvega; \
                          DROP TEMPORARY TABLE IF EXISTS nmvega'
# =====================================================================================================
+------+-------+------------+--------+-------+
| year | month | name       | rating | count |
+------+-------+------------+--------+-------+
| 1997 |     9 | Action     |      4 |   685 |
| 1997 |     9 | Action     |      5 |   353 |
| 1997 |     9 | Adventure  |      4 |   361 |
| 1997 |     9 | Adventure  |      5 |   211 |
| 1997 |     9 | Animation  |      4 |   113 |
| 1997 |     9 | Animation  |      5 |    63 |
| 1997 |     9 | Children's |      4 |   190 |
| 1997 |     9 | Children's |      5 |    73 |
| 1997 |     9 | Comedy     |      4 |   760 |
| 1997 |     9 | Comedy     |      5 |   366 |
+------+-------+------------+--------+-------+
+------------+
| SUM(count) |
+------------+
|     119131 |
+------------+
+----------+
| COUNT(*) |
+----------+
|      288 |
+----------+
In [21]:
# =====================================================================================================
# ANALYTIC: Print top-5 movies based on the rating by Students (in Occupation)
# =====================================================================================================
# -- 1) Solved first with HIVE SQL (via beeline(1).
# -- 2) Solved second with MySQL (via mysql(1)).
# =====================================================================================================
# NOTE:
#   -- Not accounted for here in the interest of time, a better analytic result would be to also
#      factor in the quantity of ratings, and not just the straight Average. For example, a
#      4.8-Star movie average with 100 ratings *could* be better than another 4.8-Star movie
#      with just 10 ratings.
#   -- Occupation ID for: Students = 19
# =====================================================================================================
!export JAVA_HOME=/usr/lib/jvm/jre; \
    /usr/bin/beeline --outputformat=table -n nmvega -u 'jdbc:hive2://vps00:10000/movielens' -e \
            'SELECT movies.title AS title, ratings.movie_id AS movie_id, AVG(ratings.rating) as avg_rating \
             FROM users \
                INNER JOIN occupations ON users.occupation_id = occupations.id \
                INNER JOIN ratings ON users.id = ratings.user_id \
                INNER JOIN movies ON movies.id = ratings.movie_id \
             WHERE occupations.id = 19 \
             GROUP BY ratings.movie_id, movies.title \
             ORDER BY AVG_rating DESC, ratings.movie_id ASC \
             LIMIT 10' 2>&1 | egrep '\||------' # Note: This is Map/Reduce so it will take a while.
# =====================================================================================================
!mysql --login-path=vps00 --table movielens -e \
            'SELECT m.title AS title, r.movie_id AS movie_id, AVG(r.rating) AS avg_rating \
             FROM users u \
                INNER JOIN occupations o ON u.occupation_id = o.id \
                INNER JOIN ratings r ON u.id = r.user_id \
                INNER JOIN movies m ON m.id = r.movie_id \
             WHERE o.name = "Student" \
             GROUP BY r.movie_id \
             ORDER BY AVG_rating DESC, r.movie_id ASC \
             LIMIT 10'
# =====================================================================================================
+----------------------------------------------------+-----------+-------------+
|                       title                        | movie_id  | avg_rating  |
+----------------------------------------------------+-----------+-------------+
| Haunted World of Edward D. Wood Jr., The (1995)    | 115       | 5.0         |
| Once Upon a Time... When We Were Colored (1995)    | 279       | 5.0         |
| Paradise Lost: The Child Murders at Robin Hood Hills (1996) | 320       | 5.0         |
| Critical Care (1997)                               | 341       | 5.0         |
| Giant (1956)                                       | 614       | 5.0         |
| One Night Stand (1997)                             | 888       | 5.0         |
| Wild Things (1998)                                 | 914       | 5.0         |
| Inspector General, The (1949)                      | 968       | 5.0         |
| Shiloh (1997)                                      | 1015      | 5.0         |
| Mark of Zorro, The (1940)                          | 1116      | 5.0         |
+----------------------------------------------------+-----------+-------------+
+-------------------------------------------------------------+----------+------------+
| title                                                       | movie_id | avg_rating |
+-------------------------------------------------------------+----------+------------+
| Haunted World of Edward D. Wood Jr., The (1995)             |      115 |     5.0000 |
| Once Upon a Time... When We Were Colored (1995)             |      279 |     5.0000 |
| Paradise Lost: The Child Murders at Robin Hood Hills (1996) |      320 |     5.0000 |
| Critical Care (1997)                                        |      341 |     5.0000 |
| Giant (1956)                                                |      614 |     5.0000 |
| One Night Stand (1997)                                      |      888 |     5.0000 |
| Wild Things (1998)                                          |      914 |     5.0000 |
| Inspector General, The (1949)                               |      968 |     5.0000 |
| Shiloh (1997)                                               |     1015 |     5.0000 |
| Mark of Zorro, The (1940)                                   |     1116 |     5.0000 |
+-------------------------------------------------------------+----------+------------+

Interacting with Kafka using Spark 2.x.

Among other things, we'll use Spark as a Kafka Producer to send each 'movielens' table through a Kafka topic, then use Spark as a Kafka Consumer to recover it at the other end. Parallelism in Kafka is achieved by using multiple topics and also using multiple partitions within topics, also known as topic-partitions. Here we are going to use just one topic (named 'movielens') with six (qty. 6) partitions, one exclusively for each table. This achieves the worst possible parallelism, but we do so in order to demonstrate message-driven partitioning based on a message key (of it's key/value pair). It should be noted that, while parallelism is not optimized, redundancy does not necessarily suffer if one uses topic-partition-replicas (though we don't do here either =:)). 'Movielens' tables now have DataFrame representations, so each row of a DataFrame (or, equivalently, of a table) will constitute one Kafka message.


In [23]:
# ======================================================================================================
# Create Kafka topic: 'movielens' topic with 6-partitions ... 
# ======================================================================================================
from confluent_kafka.admin import AdminClient, NewTopic
# ---------------------------------------------------------------------------------
topics = ['movielens',]       # List of topics to create.
nbr_replicas = 1              # Number of replicas for each topic-partition.
#nbr_partitions = len(df_dict) # (Qty. 6). TableName-to-DF dict; from an earlier notebook cell.
nbr_partitions = 6
# ---------------------------------------------------------------------------------
conf  = {'bootstrap.servers': 'vps00:9092,vps00:9093,vps00:9094'}
admin = AdminClient(conf) # Kafka AdminClient.
# ---------------------------------------------------------------------------------
admin.delete_topics(topics); time.sleep(10) # Delete topic(s) just in case they already exist.
admin.create_topics([NewTopic(topic,nbr_partitions,nbr_replicas) for topic in topics]); time.sleep(10)
# ---------------------------------------------------------------------------------
pprint.pprint(admin.list_topics().topics)
# ---------------------------------------------------------------------------------
{'movielens': TopicMetadata(movielens, 6 partitions)}
In [24]:
# ======================================================================================================
# Produce Spark DataFrames into Kafka via df.write() ...
# ======================================================================================================
# Programmatically stream contents from each DF (table) to its per-DF topic-partition.
# ======================================================================================================

# =============================================================================================
# Below is an example of what's going into Kafka on a per 'movielens' topic-partition basis ...
# =============================================================================================
#  -- Each row of a DataFrame (table) becomes one Kafka key/value message.
#
#  -- All rows of a particular DataFrame (table) go to the same 'movielens' topic-partition; determined
#     by its constant-valued 'key' column.
#
#  -- We embed a 'table' column into the DataFrame to enable programmatically determination of the table
#     that DataFrame corresponds to at Kafka Consume time. Indeed this is inefficient. A more efficient
#     way is creating a Kafka topic for each table of the same name; or using Avro and embed table
#     information in its Schema/metadata (which we do in a different notebook).
#
#  -- While in an earlier cell we did add table-name metadata to the first column (the 'id' column) of
#     each DataFrame, that metadata is not included as part of column-values that go into Kafka (as you
#     can see in the table-snippets below). This is why we need to add a 'table' column.
#
#  -- Finally, as of this writing, df.write.format("kafka").option(...).save() does not offer a .option()
#     k/v pair to numerically specify a topic-partition to write to. We therefore must determine this
#     using 'key' column values. We use Python enumarate() to generate a 'key' on each loop below:
#     '1', '2', '3', '4', '5', '6'.
# =============================================================================================
# =============================================================================================
# Examples of what's being Produced into Kafka after the transformations below ...
# =============================================================================================
# +---+------------------------------------------------------------+
# |key|value (This is the 'OCCUPATIONS' table; JSON packed values) |
# +---+------------------------------------------------------------+
# |2  |{"id":1,"name":"Administrator", "table":"occupations"}      |
# |2  |{"id":2,"name":"Artist",        "table":"occupations"}      |
# |2  |{"id":3,"name":"Doctor",        "table":"occupations"}      |
# +---+------------------------------------------------------------+

# +---+-------------------------------------------------------+
# |key|value (This is the 'GENRES' table, JSON packed values) |
# +---+-------------------------------------------------------+
# |5  |{"id":1,"name":"Action",    "table":"genres"}          |
# |5  |{"id":2,"name":"Adventure", "table":"genres"}          |
# |5  |{"id":3,"name":"Animation", "table":"genres"}          |
# +---+-------------------------------------------------------+

# -------------------------------------------------------------------------------
# Below, successive values for (key,df) are shown, which we then massage...
# -------------------------------------------------------------------------------
# 1 DataFrame[id: int, movie_id: int, genre_id: int]
# 2 DataFrame[id: int, name: string]
# 3 DataFrame[id: int, user_id: int, movie_id: int, rating: int, rated_at: bigint]
# 4 DataFrame[id: int, name: string]
# 5 DataFrame[id: int, title: string, release_date: bigint]
# 6 DataFrame[id: int, age: int, gender: string, occupation_id: int, zip_code: string]
# -------------------------------------------------------------------------------

# ======================================================================================================
# ! [IMPORTANT]: After running the below, we immediately comment out this cell contents so as not
# to accidentally duplicate contents in Kafka !
# ======================================================================================================
for (key,df) in enumerate(df_dict.values(), 1):
    df.withColumn('key', F.lit(str(key))) \
      .withColumn('table', F.lit(df.schema['id'].metadata['table'])) \
      .select('key', F.to_json(F.struct(*(df.columns + ['table',]))).alias('value')) \
      .write \
          .format('kafka') \
          .option('kafka.bootstrap.servers', 'vps00:9092,vps00:9093,vps00:9094') \
          .option('topic', 'movielens') \
          .save()

del (key,df)
print('DONE!')
DONE!
In [25]:
# ======================================================================================================
# Consume a few table samples published abive (to verify it worked) ...
# ======================================================================================================
from confluent_kafka import Consumer as kafka_Consumer, TopicPartition
# -----------------------------------------------------------------------------------------
conf = { 'bootstrap.servers' : 'vps00:9092,vps00:9093,vps00:9094',
         'group.id' : 'movielens',
         'enable.auto.commit' : False }
# -----------------------------------------------------------------------------------------
consumers = [(kafka_Consumer(conf),partition) for partition in range(0,nbr_partitions)]
[ c[0].assign([TopicPartition('movielens', c[1], 0),]) for c in consumers ]
# TopicPartition('movielens', 0, 0), TopicPartition('movielens', 1, 0),..., TopicPartition('movielens', 6, 0),
# -----------------------------------------------------------------------------------------
for c in consumers:
    # =====================================================================================
    # Validate with Python (confluent_kafka) ...
    # =====================================================================================
    print('='*50, '\nTABLE (TOPIC-PARTITION): %s\n' % c[1], '='*50, sep='')
    for _ in range(5): print(c[0].poll().value()) # Blocking call b/c .poll() has no timeout.
    c[0].close() # Sample the first-5 rows, then close the consumer.
    # =====================================================================================
    
    # =====================================================================================
    # Validate with the kafka-console-consumer.sh (CLI) ...
    # =====================================================================================
    !echo""; ./stack.d/KAFKA.d/latest/bin/kafka-console-consumer.sh \
       --partition {c[1]} \
       --bootstrap-server vps00:9092 \
       --topic 'movielens' \
       --from-beginning \
       --max-messages 5
    # =====================================================================================

print('DONE!')
==================================================
TABLE (TOPIC-PARTITION): 0
==================================================
b'{"id":1,"title":"Toy Story (1995)","release_date":788936400000,"table":"movies"}'
b'{"id":2,"title":"GoldenEye (1995)","release_date":788936400000,"table":"movies"}'
b'{"id":3,"title":"Four Rooms (1995)","release_date":788936400000,"table":"movies"}'
b'{"id":4,"title":"Get Shorty (1995)","release_date":788936400000,"table":"movies"}'
b'{"id":5,"title":"Copycat (1995)","release_date":788936400000,"table":"movies"}'

{"id":1,"title":"Toy Story (1995)","release_date":788936400000,"table":"movies"}
{"id":2,"title":"GoldenEye (1995)","release_date":788936400000,"table":"movies"}
{"id":3,"title":"Four Rooms (1995)","release_date":788936400000,"table":"movies"}
{"id":4,"title":"Get Shorty (1995)","release_date":788936400000,"table":"movies"}
{"id":5,"title":"Copycat (1995)","release_date":788936400000,"table":"movies"}
Processed a total of 5 messages
==================================================
TABLE (TOPIC-PARTITION): 1
==================================================
b'{"id":1,"name":"Action","table":"genres"}'
b'{"id":2,"name":"Adventure","table":"genres"}'
b'{"id":3,"name":"Animation","table":"genres"}'
b'{"id":4,"name":"Children\'s","table":"genres"}'
b'{"id":5,"name":"Comedy","table":"genres"}'

{"id":1,"name":"Action","table":"genres"}
{"id":2,"name":"Adventure","table":"genres"}
{"id":3,"name":"Animation","table":"genres"}
{"id":4,"name":"Children's","table":"genres"}
{"id":5,"name":"Comedy","table":"genres"}
Processed a total of 5 messages
==================================================
TABLE (TOPIC-PARTITION): 2
==================================================
b'{"id":1,"name":"Administrator","table":"occupations"}'
b'{"id":2,"name":"Artist","table":"occupations"}'
b'{"id":3,"name":"Doctor","table":"occupations"}'
b'{"id":4,"name":"Educator","table":"occupations"}'
b'{"id":5,"name":"Engineer","table":"occupations"}'

{"id":1,"name":"Administrator","table":"occupations"}
{"id":2,"name":"Artist","table":"occupations"}
{"id":3,"name":"Doctor","table":"occupations"}
{"id":4,"name":"Educator","table":"occupations"}
{"id":5,"name":"Engineer","table":"occupations"}
Processed a total of 5 messages
==================================================
TABLE (TOPIC-PARTITION): 3
==================================================
b'{"id":1,"movie_id":1,"genre_id":3,"table":"genres_movies"}'
b'{"id":2,"movie_id":1,"genre_id":4,"table":"genres_movies"}'
b'{"id":3,"movie_id":1,"genre_id":5,"table":"genres_movies"}'
b'{"id":4,"movie_id":2,"genre_id":1,"table":"genres_movies"}'
b'{"id":5,"movie_id":2,"genre_id":2,"table":"genres_movies"}'

{"id":1,"movie_id":1,"genre_id":3,"table":"genres_movies"}
{"id":2,"movie_id":1,"genre_id":4,"table":"genres_movies"}
{"id":3,"movie_id":1,"genre_id":5,"table":"genres_movies"}
{"id":4,"movie_id":2,"genre_id":1,"table":"genres_movies"}
{"id":5,"movie_id":2,"genre_id":2,"table":"genres_movies"}
Processed a total of 5 messages
==================================================
TABLE (TOPIC-PARTITION): 4
==================================================
b'{"id":1,"age":24,"gender":"M","occupation_id":20,"zip_code":"85711","table":"users"}'
b'{"id":2,"age":53,"gender":"F","occupation_id":14,"zip_code":"94043","table":"users"}'
b'{"id":3,"age":23,"gender":"M","occupation_id":21,"zip_code":"32067","table":"users"}'
b'{"id":4,"age":24,"gender":"M","occupation_id":20,"zip_code":"43537","table":"users"}'
b'{"id":5,"age":33,"gender":"F","occupation_id":14,"zip_code":"15213","table":"users"}'

{"id":1,"age":24,"gender":"M","occupation_id":20,"zip_code":"85711","table":"users"}
{"id":2,"age":53,"gender":"F","occupation_id":14,"zip_code":"94043","table":"users"}
{"id":3,"age":23,"gender":"M","occupation_id":21,"zip_code":"32067","table":"users"}
{"id":4,"age":24,"gender":"M","occupation_id":20,"zip_code":"43537","table":"users"}
{"id":5,"age":33,"gender":"F","occupation_id":14,"zip_code":"15213","table":"users"}
Processed a total of 5 messages
==================================================
TABLE (TOPIC-PARTITION): 5
==================================================
b'{"id":1,"user_id":196,"movie_id":242,"rating":3,"rated_at":881240149000,"table":"ratings"}'
b'{"id":2,"user_id":186,"movie_id":302,"rating":3,"rated_at":891706942000,"table":"ratings"}'
b'{"id":3,"user_id":22,"movie_id":377,"rating":1,"rated_at":878876316000,"table":"ratings"}'
b'{"id":4,"user_id":244,"movie_id":51,"rating":2,"rated_at":880596123000,"table":"ratings"}'
b'{"id":5,"user_id":166,"movie_id":346,"rating":1,"rated_at":886386796000,"table":"ratings"}'

{"id":1,"user_id":196,"movie_id":242,"rating":3,"rated_at":881240149000,"table":"ratings"}
{"id":2,"user_id":186,"movie_id":302,"rating":3,"rated_at":891706942000,"table":"ratings"}
{"id":3,"user_id":22,"movie_id":377,"rating":1,"rated_at":878876316000,"table":"ratings"}
{"id":4,"user_id":244,"movie_id":51,"rating":2,"rated_at":880596123000,"table":"ratings"}
{"id":5,"user_id":166,"movie_id":346,"rating":1,"rated_at":886386796000,"table":"ratings"}
Processed a total of 5 messages
DONE!
In [26]:
# ======================================================================================================
# A Spark app to read the Kafka topics (movielens dataset) and print to Console.
# ======================================================================================================
# Part 1-of-3: Deserialize Kafka topic-partitions into respective DataFrames.
# ======================================================================================================
# In this cell we loop through each 'movielens' topic-partition (each of which correspond to one
# DataFrame/table) and de-serialize its message-content into its own Spark DataFrame. The DataFrame
# Row()s we consume (i.e. the Kafka messages) will have as their values, columns that are packed in
# a (denormalized) JSON strings.
# In the following cell, we unpack and 're-normalize' them back proper DataFrame separate columns.
# ======================================================================================================
df_dict = {}
for partition in range(0,nbr_partitions):
    df = spark_sesn.read  \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "vps00:9092,vps00:9093,vps00:9094") \
        .option("assign", json.dumps({"movielens" : [partition]})) \
        .option("startingOffsets", "earliest") \
        .load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
        .drop('key') # We no longer need the Kafka partitioning key.
    
    # [ value: {"id":5, "user_id":166, "movie_id":346, "rating":1, "rated_at":886386796000, "table":"ratings"}, ]
    table_name = json.loads(df.take(1)[0].value)['table'] # Get table-name. df.take(N) returns a list() of JSON.
    df_dict[table_name] = df
    print('TOPIC: movielens | PARTITION: %s | TABLE: %s | KAFKA K/V MESSAGES CONSUMED: %d'
          % (partition, table_name, df.count()))
    df.show(3, False)

pprint.pprint(df_dict)
TOPIC: movielens | PARTITION: 0 | TABLE: movies | KAFKA K/V MESSAGES CONSUMED: 1682
+---------------------------------------------------------------------------------+
|value                                                                            |
+---------------------------------------------------------------------------------+
|{"id":1,"title":"Toy Story (1995)","release_date":788936400000,"table":"movies"} |
|{"id":2,"title":"GoldenEye (1995)","release_date":788936400000,"table":"movies"} |
|{"id":3,"title":"Four Rooms (1995)","release_date":788936400000,"table":"movies"}|
+---------------------------------------------------------------------------------+
only showing top 3 rows

TOPIC: movielens | PARTITION: 1 | TABLE: genres | KAFKA K/V MESSAGES CONSUMED: 18
+--------------------------------------------+
|value                                       |
+--------------------------------------------+
|{"id":1,"name":"Action","table":"genres"}   |
|{"id":2,"name":"Adventure","table":"genres"}|
|{"id":3,"name":"Animation","table":"genres"}|
+--------------------------------------------+
only showing top 3 rows

TOPIC: movielens | PARTITION: 2 | TABLE: occupations | KAFKA K/V MESSAGES CONSUMED: 21
+-----------------------------------------------------+
|value                                                |
+-----------------------------------------------------+
|{"id":1,"name":"Administrator","table":"occupations"}|
|{"id":2,"name":"Artist","table":"occupations"}       |
|{"id":3,"name":"Doctor","table":"occupations"}       |
+-----------------------------------------------------+
only showing top 3 rows

TOPIC: movielens | PARTITION: 3 | TABLE: genres_movies | KAFKA K/V MESSAGES CONSUMED: 2891
+----------------------------------------------------------+
|value                                                     |
+----------------------------------------------------------+
|{"id":1,"movie_id":1,"genre_id":3,"table":"genres_movies"}|
|{"id":2,"movie_id":1,"genre_id":4,"table":"genres_movies"}|
|{"id":3,"movie_id":1,"genre_id":5,"table":"genres_movies"}|
+----------------------------------------------------------+
only showing top 3 rows

TOPIC: movielens | PARTITION: 4 | TABLE: users | KAFKA K/V MESSAGES CONSUMED: 943
+------------------------------------------------------------------------------------+
|value                                                                               |
+------------------------------------------------------------------------------------+
|{"id":1,"age":24,"gender":"M","occupation_id":20,"zip_code":"85711","table":"users"}|
|{"id":2,"age":53,"gender":"F","occupation_id":14,"zip_code":"94043","table":"users"}|
|{"id":3,"age":23,"gender":"M","occupation_id":21,"zip_code":"32067","table":"users"}|
+------------------------------------------------------------------------------------+
only showing top 3 rows

TOPIC: movielens | PARTITION: 5 | TABLE: ratings | KAFKA K/V MESSAGES CONSUMED: 100000
+------------------------------------------------------------------------------------------+
|value                                                                                     |
+------------------------------------------------------------------------------------------+
|{"id":1,"user_id":196,"movie_id":242,"rating":3,"rated_at":881240149000,"table":"ratings"}|
|{"id":2,"user_id":186,"movie_id":302,"rating":3,"rated_at":891706942000,"table":"ratings"}|
|{"id":3,"user_id":22,"movie_id":377,"rating":1,"rated_at":878876316000,"table":"ratings"} |
+------------------------------------------------------------------------------------------+
only showing top 3 rows

{'genres': DataFrame[value: string],
 'genres_movies': DataFrame[value: string],
 'movies': DataFrame[value: string],
 'occupations': DataFrame[value: string],
 'ratings': DataFrame[value: string],
 'users': DataFrame[value: string]}
In [27]:
# ======================================================================================================
# A Spark app to read the Kafka topics (movielens dataset) and print to Console.
# ======================================================================================================
# Part 2-of-3: Re-Normalize DataFrames values packed as JSON strings, by separating them into respective columns.
# ======================================================================================================

# ======================================================================================================
# Using the following three (qty. 3) pieces of output for the 'RATINGS' table -- as an example -- we
# see how we arrive at the DataFrame code below to RE-NORMALIZE the 'ratings' DataFrame. The same
# logic applies to RE-NORMALIZING the other DataFrames, too.
# ======================================================================================================

# ======================================================================================================
# (1) >>> print(ratings_df.schema):
#    -- This gives us the column-names and datatypes we'll need below (for ratings_df2) ...
# ======================================================================================================
#      StructType(List(StructField(id,       IntegerType, true),
#                      StructField(user_id,  IntegerType, true),
#                      StructField(movie_id, IntegerType, true),
#                      StructField(rating,   IntegerType, true),
#                      StructField(rated_at, LongType,    true)))
# ======================================================================================================

# ======================================================================================================
# (2) >>> print(df_dict['ratings'].schema):
#    -- Every topic-partition/table emerging from Kafka has this same VALUE-only schema (recalling
#       that above we df.drop()ped the no-longer-needed 'KEY' column) ...
# ======================================================================================================
#      StructType(List(StructField(value,StringType,true)))
# ======================================================================================================

# ======================================================================================================
# (3) >>> df_dict['ratings'].show(3, False)
#    -- And we remind ourselves here that the value for column 'value' are JSON strings because we
#       used F.to_json() going into Kafka. We thus use F.from_json() emerging from Kafka below.
# ======================================================================================================
#      +--------------------------------------------------------------------------------------------+
#      |value                                                                                       |
#      +--------------------------------------------------------------------------------------------+
#      | {"id":1,"user_id":196,"movie_id":242,"rating":3,"rated_at":881240149000,"table":"ratings"} |
#      | {"id":2,"user_id":186,"movie_id":302,"rating":3,"rated_at":891706942000,"table":"ratings"} |
#      | {"id":3,"user_id":22,"movie_id":377,"rating":1,"rated_at":878876316000,"table":"ratings"}  |
#      +--------------------------------------------------------------------------------------------+
# ======================================================================================================
df_ratings = df_dict['ratings'].select(
    F.from_json(F.col('value'), "id INT").getItem('id').alias("id", metadata = {'table' : 'ratings'}),
    F.from_json(F.col('value'), "user_id INT").getItem('user_id').alias("user_id"),
    F.from_json(F.col('value'), "movie_id INT").getItem('movie_id').alias("movie_id"),
    F.from_json(F.col('value'), "rating INT").getItem('rating').alias("rating"),
    F.from_json(F.col('value'), "rated_at LONG").getItem('rated_at').alias("rated_at"),)

df_genres = df_dict['genres'].select(
    F.from_json(F.col('value'), "id INT").getItem('id').alias("id", metadata = {'table' : 'genres'}),
    F.from_json(F.col('value'), "name STRING").getItem('name').alias("name"),)

df_genres_movies = df_dict['genres_movies'].select(
    F.from_json(F.col('value'), "id INT").getItem('id').alias("id", metadata = {'table' : 'genres_movies'}),
    F.from_json(F.col('value'), "movie_id INT").getItem('movie_id').alias("movie_id"),
    F.from_json(F.col('value'), "genre_id INT").getItem('genre_id').alias("genre_id"),)

df_occupations = df_dict['occupations'].select(
    F.from_json(F.col('value'), "id INT").getItem('id').alias("id", metadata = {'table' : 'occupations'}),
    F.from_json(F.col('value'), "name STRING").getItem('name').alias("name"),)

df_movies = df_dict['movies'].select(
    F.from_json(F.col('value'), "id INT").getItem('id').alias("id", metadata = {'table' : 'movies'}),
    F.from_json(F.col('value'), "title STRING").getItem('title').alias("title"),
    F.from_json(F.col('value'), "release_date LONG").getItem('release_date').alias("release_date"),)

df_users = df_dict['users'].select(
    F.from_json(F.col('value'), "id INT").getItem('id').alias("id", metadata = {'table' : 'users'}),
    F.from_json(F.col('value'), "age INT").getItem('age').alias("age"),
    F.from_json(F.col('value'), "gender STRING").getItem('gender').alias("gender"),
    F.from_json(F.col('value'), "occupation_id INT").getItem('occupation_id').alias("occupation_id"),
    F.from_json(F.col('value'), "zip_code STRING").getItem('zip_code').alias("zip_code"),)

print('DONE!')
DONE!
In [28]:
# ======================================================================================================
# A Spark app to read the Kafka topics (movielens dataset) and print to Console.
# ======================================================================================================
# Part 3-of-3: Print the normalized DataFrame results to console.
# As a ETL sanity-check (i.e. JSON de-normalizing; Serializing/Deserializing over Kafka; JSON
# re-normalizing; etc), we compare DataFrame output with MySQL output (the canonical source).
# ======================================================================================================
for df in [df_genres, df_genres_movies, df_movies, df_occupations, df_ratings, df_users]:
    tbl = df.schema[df.columns[0]].metadata['table']
    print("=======================================================")
    print('TABLE:', tbl.upper())
    print("=======================================================")
    print("VIA: SparkDF --> Kafka --> SparkDF")
    print("=======================================================")
    df.show(5, False) # Spark: Print first 5-rows.
    df.where('id == 16').show(1, False)  # Spark: Print a random row: 16
    print('COUNT:', df.count()) # Spark: Print total row count (all confirms all Kafka messages consumed).
    
    print("=======================================================")
    print("VIA: MySQL canonical source") # The following emit same as above, but via MySQL canonical source.
    print("=======================================================")
    !mysql --login-path=vps00 movielens -e "SELECT * FROM {tbl} LIMIT 5"
    !mysql --login-path=vps00 movielens -e "SELECT * FROM {tbl} WHERE id = 16"
    !mysql --login-path=vps00 movielens -e "SELECT COUNT(*) AS COUNT FROM {tbl}"
    print("=======================================================\n")
    # ------------------------------------------------------------------
=======================================================
TABLE: GENRES
=======================================================
VIA: SparkDF --> Kafka --> SparkDF
=======================================================
+---+----------+
|id |name      |
+---+----------+
|1  |Action    |
|2  |Adventure |
|3  |Animation |
|4  |Children's|
|5  |Comedy    |
+---+----------+
only showing top 5 rows

+---+--------+
|id |name    |
+---+--------+
|16 |Thriller|
+---+--------+

COUNT: 18
=======================================================
VIA: MySQL canonical source
=======================================================
+----+------------+
| id | name       |
+----+------------+
|  1 | Action     |
|  2 | Adventure  |
|  3 | Animation  |
|  4 | Children's |
|  5 | Comedy     |
+----+------------+
+----+----------+
| id | name     |
+----+----------+
| 16 | Thriller |
+----+----------+
+-------+
| COUNT |
+-------+
|    18 |
+-------+
=======================================================

=======================================================
TABLE: GENRES_MOVIES
=======================================================
VIA: SparkDF --> Kafka --> SparkDF
=======================================================
+---+--------+--------+
|id |movie_id|genre_id|
+---+--------+--------+
|1  |1       |3       |
|2  |1       |4       |
|3  |1       |5       |
|4  |2       |1       |
|5  |2       |2       |
+---+--------+--------+
only showing top 5 rows

+---+--------+--------+
|id |movie_id|genre_id|
+---+--------+--------+
|16 |7       |15      |
+---+--------+--------+

COUNT: 2891
=======================================================
VIA: MySQL canonical source
=======================================================
+----+----------+----------+
| id | movie_id | genre_id |
+----+----------+----------+
|  1 |        1 |        3 |
|  2 |        1 |        4 |
|  3 |        1 |        5 |
|  4 |        2 |        1 |
|  5 |        2 |        2 |
+----+----------+----------+
+----+----------+----------+
| id | movie_id | genre_id |
+----+----------+----------+
| 16 |        7 |       15 |
+----+----------+----------+
+-------+
| COUNT |
+-------+
|  2891 |
+-------+
=======================================================

=======================================================
TABLE: MOVIES
=======================================================
VIA: SparkDF --> Kafka --> SparkDF
=======================================================
+---+-----------------+------------+
|id |title            |release_date|
+---+-----------------+------------+
|1  |Toy Story (1995) |788936400000|
|2  |GoldenEye (1995) |788936400000|
|3  |Four Rooms (1995)|788936400000|
|4  |Get Shorty (1995)|788936400000|
|5  |Copycat (1995)   |788936400000|
+---+-----------------+------------+
only showing top 5 rows

+---+----------------------------------+------------+
|id |title                             |release_date|
+---+----------------------------------+------------+
|16 |French Twist (Gazon maudit) (1995)|788936400000|
+---+----------------------------------+------------+

COUNT: 1682
=======================================================
VIA: MySQL canonical source
=======================================================
+----+-------------------+--------------+
| id | title             | release_date |
+----+-------------------+--------------+
|  1 | Toy Story (1995)  | 1995-01-01   |
|  2 | GoldenEye (1995)  | 1995-01-01   |
|  3 | Four Rooms (1995) | 1995-01-01   |
|  4 | Get Shorty (1995) | 1995-01-01   |
|  5 | Copycat (1995)    | 1995-01-01   |
+----+-------------------+--------------+
+----+------------------------------------+--------------+
| id | title                              | release_date |
+----+------------------------------------+--------------+
| 16 | French Twist (Gazon maudit) (1995) | 1995-01-01   |
+----+------------------------------------+--------------+
+-------+
| COUNT |
+-------+
|  1682 |
+-------+
=======================================================

=======================================================
TABLE: OCCUPATIONS
=======================================================
VIA: SparkDF --> Kafka --> SparkDF
=======================================================
+---+-------------+
|id |name         |
+---+-------------+
|1  |Administrator|
|2  |Artist       |
|3  |Doctor       |
|4  |Educator     |
|5  |Engineer     |
+---+-------------+
only showing top 5 rows

+---+-------+
|id |name   |
+---+-------+
|16 |Retired|
+---+-------+

COUNT: 21
=======================================================
VIA: MySQL canonical source
=======================================================
+----+---------------+
| id | name          |
+----+---------------+
|  1 | Administrator |
|  2 | Artist        |
|  3 | Doctor        |
|  4 | Educator      |
|  5 | Engineer      |
+----+---------------+
+----+---------+
| id | name    |
+----+---------+
| 16 | Retired |
+----+---------+
+-------+
| COUNT |
+-------+
|    21 |
+-------+
=======================================================

=======================================================
TABLE: RATINGS
=======================================================
VIA: SparkDF --> Kafka --> SparkDF
=======================================================
+---+-------+--------+------+------------+
|id |user_id|movie_id|rating|rated_at    |
+---+-------+--------+------+------------+
|1  |196    |242     |3     |881240149000|
|2  |186    |302     |3     |891706942000|
|3  |22     |377     |1     |878876316000|
|4  |244    |51      |2     |880596123000|
|5  |166    |346     |1     |886386796000|
+---+-------+--------+------+------------+
only showing top 5 rows

+---+-------+--------+------+------------+
|id |user_id|movie_id|rating|rated_at    |
+---+-------+--------+------+------------+
|16 |303    |785     |3     |879474518000|
+---+-------+--------+------+------------+

COUNT: 100000
=======================================================
VIA: MySQL canonical source
=======================================================
+----+---------+----------+--------+---------------------+
| id | user_id | movie_id | rating | rated_at            |
+----+---------+----------+--------+---------------------+
|  1 |     196 |      242 |      3 | 1997-12-04 07:55:49 |
|  2 |     186 |      302 |      3 | 1998-04-04 11:22:22 |
|  3 |      22 |      377 |      1 | 1997-11-06 23:18:36 |
|  4 |     244 |       51 |      2 | 1997-11-26 21:02:03 |
|  5 |     166 |      346 |      1 | 1998-02-01 21:33:16 |
+----+---------+----------+--------+---------------------+
+----+---------+----------+--------+---------------------+
| id | user_id | movie_id | rating | rated_at            |
+----+---------+----------+--------+---------------------+
| 16 |     303 |      785 |      3 | 1997-11-13 21:28:38 |
+----+---------+----------+--------+---------------------+
+--------+
| COUNT  |
+--------+
| 100000 |
+--------+
=======================================================

=======================================================
TABLE: USERS
=======================================================
VIA: SparkDF --> Kafka --> SparkDF
=======================================================
+---+---+------+-------------+--------+
|id |age|gender|occupation_id|zip_code|
+---+---+------+-------------+--------+
|1  |24 |M     |20           |85711   |
|2  |53 |F     |14           |94043   |
|3  |23 |M     |21           |32067   |
|4  |24 |M     |20           |43537   |
|5  |33 |F     |14           |15213   |
+---+---+------+-------------+--------+
only showing top 5 rows

+---+---+------+-------------+--------+
|id |age|gender|occupation_id|zip_code|
+---+---+------+-------------+--------+
|16 |21 |M     |6            |10309   |
+---+---+------+-------------+--------+

COUNT: 943
=======================================================
VIA: MySQL canonical source
=======================================================
+----+------+--------+---------------+----------+
| id | age  | gender | occupation_id | zip_code |
+----+------+--------+---------------+----------+
|  1 |   24 | M      |            20 | 85711    |
|  2 |   53 | F      |            14 | 94043    |
|  3 |   23 | M      |            21 | 32067    |
|  4 |   24 | M      |            20 | 43537    |
|  5 |   33 | F      |            14 | 15213    |
+----+------+--------+---------------+----------+
+----+------+--------+---------------+----------+
| id | age  | gender | occupation_id | zip_code |
+----+------+--------+---------------+----------+
| 16 |   21 | M      |             6 | 10309    |
+----+------+--------+---------------+----------+
+-------+
| COUNT |
+-------+
|   943 |
+-------+
=======================================================

In [29]:
# ======================================================================================================
# ANALYTIC: Write a Spark app to:
#  -- (1) Determine the Maximum Rating of each genre.
#  -- (2) Determine which genre is most rated by the users.
# ======================================================================================================
df1 = df_ratings.drop(*['id', 'user_id', 'rated_at']) # Filter-out (.drop()) unneeded columns.
df2 = df_genres_movies.drop('id') # 'id' is meaningless in: id, genre_id, movie_id; so .drop() it. 
df3 = df_genres.withColumnRenamed('id', 'genre_id') # So we can drop both 'genre_id' columns w/ one .drop() later.

df4 = df1.join(df2, df1.movie_id == df2.movie_id, 'INNER').drop('movie_id') # Drops both 'movie_id' columns.
df5 = df4.join(df3, df4.genre_id == df3.genre_id, 'INNER').drop('genre_id') # Drops both 'genre_id' columns.
df5 = df5.withColumn('rating', df5.rating.cast(T.IntegerType())) # Message KEYS & VALUES were pulled from
                                                                 # Kafka as Strings; so the 'rating' column
                                                                 # needs to be cast() to Integer so that
                                                                 # numeric aggregations after a GroupBy()
                                                                 # (in this case max() and count()) work.
                                                                 # We do this here.
            
gdf = df5.groupBy(df5.name) # Group by genre names so we can aggregate on each grouped "window".a

# ===============================================================================
# Highest rating of each genre ...
# ===============================================================================
gdf.max('rating').withColumnRenamed('max(rating)', 'max_rating').withColumnRenamed('name', 'genre').show()
# ===============================================================================

# ===============================================================================
# Num ratings per genre ...
# ===============================================================================
gdf.count().withColumnRenamed('count', 'nbr_of_ratings') \
           .withColumnRenamed('name', 'genre') \
           .sort(F.col('nbr_of_ratings').desc()).show()
# ===============================================================================
del (df1, df2, df3, df4, df5, gdf)
+-----------+----------+
|      genre|max_rating|
+-----------+----------+
|      Crime|         5|
|    Romance|         5|
|   Thriller|         5|
|  Adventure|         5|
| Children's|         5|
|      Drama|         5|
|        War|         5|
|Documentary|         5|
|    Fantasy|         5|
|    Mystery|         5|
|    Musical|         5|
|  Animation|         5|
|  Film-Noir|         5|
|     Horror|         5|
|    Western|         5|
|     Comedy|         5|
|     Action|         5|
|     Sci-Fi|         5|
+-----------+----------+

+-----------+--------------+
|      genre|nbr_of_ratings|
+-----------+--------------+
|      Drama|         39895|
|     Comedy|         29832|
|     Action|         25589|
|   Thriller|         21872|
|    Romance|         19461|
|  Adventure|         13753|
|     Sci-Fi|         12730|
|        War|          9398|
|      Crime|          8055|
| Children's|          7182|
|     Horror|          5317|
|    Mystery|          5245|
|    Musical|          4954|
|  Animation|          3605|
|    Western|          1854|
|  Film-Noir|          1733|
|    Fantasy|          1352|
|Documentary|           758|
+-----------+--------------+

In [ ]: