Hadoop Learning Series – Hive, Flume, HDFS, and Retail Analysis

This is a guest article by 47Line Technologies.

Last week we introduced Big Data ecosystem and showed a glimpse of the possibilities. This week we take one industry (Retail) use case and illustrate how the various tools can be orchestrated to provide insights.

Background

The last couple of decades has seen a tectonic shift in the retail industry. The hawkers and mom and pop stores are being sidelined by heavyweight retail hypermarkets who operate in a complex landscape involving franchisees, JVs, and multi-partner vendors. In this kind of an environment, try visualizing the inventory, sales, supplier info for thousands of SKUs (Stock Keeping Units) per store and multiply it with the several thousand stores across cities, states and even countries over days, months and years and you will realize the volume of data they would be collecting.

One such retail hypermarket, let’s say BigRetail had 5-years of data containing vast amounts of a semi-structured dataset which they wanted to be analyzed for trends and patterns.

The Problem

  • The 5-year dataset was 13TB in size.
  • Traditional business intelligence (BI) tools work best in the presence of a pre-defined schema. The BigRetail dataset was mostly logs which didn’t conform to any specific schema.
  • BigRetail took close to half a day to move the data into their BI systems weekly. They wanted to reduce this time.
  • Queries over this data set took hours

The Solution

This is where Hadoop shines in all its glory!
The problem is 2-fold:
Problem 1: Moving the logs into HDFS periodically
Problem 2: Performing analysis on this HDFS dataset

As we had seen in the previous post, Apache Sqoop is used to move structured dataset into HDFS. Alas! How do we move semi-structured data? Fret not. Apache Flume is specially designed for collecting, aggregating, and moving large amounts of log data into HDFS. Once the dataset is inside HDFS, Hive was used to perform analysis.

Let’s dig deep. Mind you – The devil is in the details.

Problem 1: How Flume solved the data transfer problem?
The primary use case for Flume is as a logging system that gathers a set of log files on every machine in a cluster and aggregates them to a centralized persistent HDFS store.

Flume Architecture
Flume Architecture
Flume’s typical dataflow is as follows: A Flume Agent is installed on each node of the cluster that produces log messages. These streams of log messages from every node are then sent to the Flume Collector. The collectors then aggregate the streams into larger streams which can then be efficiently written to a storage tier such as HDFS.

Problem 2: Analysis using Hive

Hive Implementation
Hive uses “Schema on Read” unlike a traditional database which uses “Schema on Write”. Schema on Write implies that a table’s schema is enforced at data load time. If the data being loaded doesn’t conform to the schema, then it is rejected. This mechanism might slow the loading process of the dataset usually, Whereas Schema on Read doesn’t verify the data when it’s loaded, but rather when a query is issued. For this precise reason, once the dataset is in HDFS moving it into Hive controlled namespace is usually instantaneous. Hive can also perform analysis on dataset in HDFS or local storage. But the preferred approach is to move the entire dataset into Hive controlled namespace (default location – hdfs://user/hive/warehouse) to enable additional query optimizations.

While reading log files, the simplest recommended approach during Hive table creation is to use a RegexSerDe. It uses regular expression (regex) to serialize/deserialize. It deserializes the data using regex and extracts groups as columns. It can also serialize the row object using a format string.

Caveats:

  • With RegexSerDe all columns have to be strings. Use “CAST (a AS INT)” to convert columns to other types.
  • While moving data from HDFS to Hive, DO NOT use the keyword OVERWRITE

Solution Architecture using Flume + Hive

Flume + Hive Architecture
The merchandise details, user information, time of the transaction, area / city / state information, coupon codes (if any), customer data and other related details were collected and aggregated from various backend servers.

As mentioned earlier, the data-set to be analyzed was 13TB. Using the Hadoop default replication factor of 3, it would require 13TB * 3 = 39TB of storage capacity. After a couple of iterations on a smaller sample data set and subsequent performance tuning, we finalized the below cluster configuration and capacities –

  • 45 virtual instances, each with
    • 64-bit OS platform
    • 12 GB RAM
    • 4-6 CPU cores
    • 1 TB Storage

Flume configuration

Following Flume parameters were configured (sample) –

  • flume.event.max.size.bytes uses the default value of 32KB.
  • flume.agent.logdir was changed to point to an appropriate HDFS directory
  • flume.master.servers: 3 Flume Masters – flumeMaster1, flumeMaster2, flumeMaster3
  • flume.master.store uses the default value – zookeeper

Hive configuration

Following Hive parameters were configured (sample) –

  • javax.jdo.option.ConnectionURL
  • javax.jdo.option.ConnectionDriverName: set the value to “com.mysql.jdbc.Driver
  • javax.jdo.option.ConnectionUserName
  • javax.jdo.option.ConnectionPassword

By default, Hive metadata is usually stored in an embedded Derby database which allows only one user to issue queries. This is not ideal for production purposes. Hence, Hive was configured to use MySQL in this case.

Using the Hadoop system, log transfer time was reduced to ~3 hours weekly and querying time also was significantly improved.

Some of the schema tables that were present in the final design were – facts, products, customers, categories, locations and payments. Some sample Hive queries that were executed as part of the analysis are as follows –

  • Count the number of transactions
    • Select count (*) from facts;
  • Count the number of distinct users by gender
    • Select gender, count (DISTINCT customer_id) from customers group by gender;

Only equality joins, inner & outer joins, semi joins and map joins are supported in Hive. Hive does not support join conditions that are not equality conditions as it is very difficult to express such conditions as a MapReduce job. Also, more than two tables can be joined in Hive.

  • List the category to which the product belongs
    • Select products .product_name, products .product_id, categories.category_name from products JOIN categories ON (products.product_category_id = categories.category_id);
  • Count of the number of transactions from each location
    • Select locations.location_name, count (DISTINCT facts.payment_id) from facts JOIN locations ON (facts.location_id = locations.location_id) group by locations .location_name;

Interesting trends/analysis using Hive

Some of the interesting trends that were observed from this dataset using Hive were:

  • There was a healthy increase in YoY growth across all retail product categories
  • Health & Beauty Products saw the highest growth rate at 72%, closely followed by Food Products (65 %) and Entertainment (57.8%).
  • Northern states spend more on Health & Beauty Products while the South spent more on Books and Food Products
  • 2 metros took the top spot for the purchase of Fashion & Apparels
  • A very interesting and out-of-the-ordinary observation was that men shop more than women! Though the difference isn’t much, it’s quite shocking J (Note: This also could be because when couples tend to shop together the man pays the bill!)

This is just one illustration of the possibilities of Big Data analysis and we will try to cover more in the coming articles. Stay tuned!

Cloud Academy