Chapter 8. A Relational View on Data with Hive
MapReduce is a powerful paradigm which enables complex data processing that can reveal valuable insights. However, it does require a different mindset and some training and experience on the model of breaking processing analytics into a series of map and reduce steps. There are several products that are built atop Hadoop to provide higher-level or more familiar views on the data held within HDFS. This chapter will introduce one of the most popular of these tools, Hive .
In this chapter, we will cover:
- What Hive is and why you may want to use it
- How to install and configure Hive
- Using Hive to perform SQL-like analysis of the UFO data set
- How Hive can approximate common features of a relational database such as joins and views
- How to efficiently use Hive across very large data sets
- How Hive allows the incorporation of user-defined functions into its queries
- How Hive complements another common tool, Pig
Overview of Hive
Hive is a data warehouse that uses MapReduce to analyze data stored on HDFS. In particular, it provides a query language called HiveQL that closely resembles the common Structured Query Language (SQL) standard.
Why use Hive?
In that one large benefit of Streaming is how it allows faster turn-around in the development of MapReduce jobs. Hive takes this a step further. Instead of providing a way of more quickly developing map and reduce tasks, it offers a query language based on the industry standard SQL. Hive takes these HiveQL statements and immediately and automatically translates the queries into one or more MapReduce jobs. It then executes the overall MapReduce program and returns the results to the user. Whereas Hadoop Streaming reduces the required code/compile/submit cycle, Hive removes it entirely and instead only requires the composition of HiveQL statements.
This interface to Hadoop not only accelerates the time required to produce results from data analysis, it significantly broadens who can use Hadoop and MapReduce. Instead of requiring software development skills, anyone with a familiarity with SQL can use Hive.
The combination of these attributes is that Hive is often used as a tool for business and data analysts to perform ad hoc queries on the data stored on HDFS. Direct use of MapReduce requires map and reduce tasks to be written before the job can be executed which means a necessary delay from the idea of a possible query to its execution. With Hive, the data analyst can work on refining HiveQL queries without the ongoing involvement of a software developer. There are of course operational and practical limitations (a badly written query will be inefficient regardless of technology) but the broad principle is compelling.
Thanks, Facebook!
Just as we earlier thanked Google, Yahoo!, and Doug Cutting for their contributions to Hadoop and the technologies that inspired it, it is to Facebook that we must now direct thanks.
Hive was developed by the Facebook Data team and, after being used internally, it was contributed to the Apache Software Foundation and made freely available as open source software. Its homepage is http://hive.apache.org.
Setting up Hive
In this section, we will walk through the act of downloading, installing, and configuring Hive.
Prerequisites
Unlike Hadoop, there are no Hive masters, slaves, or nodes. Hive runs as a client application that processes HiveQL queries, converts them into MapReduce jobs, and submits these to a Hadoop cluster.
Although there is a mode suitable for small jobs and development usage, the usual situation is that Hive will require an existing functioning Hadoop cluster.
Just as other Hadoop clients don't need to be executed on the actual cluster nodes, Hive can be executed on any host where the following are true:
- Hadoop is installed on the host (even if no processes are running)
- The
HADOOP_HOME
environment variable is set and points to the location of the Hadoop installation - The
${HADOOP_HOME}/bin
directory is added to the system or user path
Getting Hive
You should download the latest stable Hive version from http://hive.apache.org/releases.html.
The Hive getting started guide at http://cwiki.apache.org/confluence/display/Hive/GettingStarted will give recommendations on version compatibility, but as a general principle, you should expect the most recent stable versions of Hive, Hadoop, and Java to work together.
Time for action – installing Hive
Let's now set up Hive so we can start using it in action.
- Download the latest stable version of Hive and move it to the location to which you wish to have it installed:
$ mv hive-0.8.1.tar.gz /usr/local
- Uncompress the package:
$ tar –xzf hive-0.8.1.tar.gz
- Set the
HIVE_HOME
variable to the installation directory:$ export HIVE_HOME=/usr/local/hive
- Add the Hive home directory to the path variable:
$ export PATH=${HIVE_HOME}/bin:${PATH}
- Create directories required by Hive on HDFS:
$ hadoop fs -mkdir /tmp $ hadoop fs -mkdir /user/hive/warehouse
- Make both of these directories group writeable:
$ hadoop fs -chmod g+w /tmp $ hadoop fs -chmod g+w /user/hive/warehouse
- Try to start Hive:
$ hive
You will receive the following response:
Logging initialized using configuration in jar:file:/opt/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/hadoop/hive_job_log_hadoop_201203031500_480385673.txt hive>
- Exit the Hive interactive shell:
$ hive> quit;
What just happened?
After downloading the latest stable Hive release, we copied it to the desired location and uncompressed the archive file. This created a directory, hive-<version>
.
Similarly, as we previously defined HADOOP_HOME
and added the bin
directory within the installation to the path variable, we then did something similar with HIVE_HOME and its bin directory.
Note
Remember that to avoid having to set these variables every time you log in, add them to your shell login script or to a separate configuration script that you source when you want to use Hive.
We then created two directories on HDFS that Hive requires and changed their attributes to make them group writeable. The /tmp
directory is where Hive will, by default, write transient data created during query execution and will also place output data in this location. The /user/hive/warehouse
directory is where Hive will store the data that is written into its tables.
After all this setup, we run the hive
command and a successful installation will give output similar to the one mentioned above. Running the hive
command with no arguments enters an interactive shell; the hive>
prompt is analogous to the sql>
or mysql>
prompts familiar from relational database interactive tools.
We then exit the interactive shell by typing quit;
. Note the trailing semicolon ;
. HiveQL is, as mentioned, very similar to SQL and follows the convention that all commands must be terminated by a semicolon. Pressing Enter without a semicolon will allow commands to be continued on subsequent lines.
Using Hive
With our Hive installation, we will now import and analyze the UFO data set introduced in Chapter 4, Developing MapReduce Programs.
When importing any new data into Hive, there is generally a three-stage process:
- Create the specification of the table into which the data is to be imported.
- Import the data into the created table.
- Execute HiveQL queries against the table.
This process should look very familiar to those with experience with relational databases. Hive gives a structured query view of our data and to enable that, we must first define the specification of the table's columns and import the data into the table before we can execute any queries.
Note
We assume a general level of familiarity with SQL and will be focusing more on how to get things done with Hive than in explaining particular SQL constructs in detail. A SQL reference may be handy for those with little familiarity with the language, though we will make sure you know what each statement does, even if the details require deeper SQL knowledge.
Time for action – creating a table for the UFO data
Perform the following steps to create a table for the UFO data:
- Start the Hive interactive shell:
$ hive
- Create a table for the UFO data set, splitting the statement across multiple lines for easy readability:
hive> CREATE TABLE ufodata(sighted STRING, reported STRING, sighting_location STRING, > shape STRING, duration STRING, description STRING COMMENT 'Free text description') COMMENT 'The UFO data set.' ;
You should see the following lines once you are done:
OK Time taken: 0.238 seconds
- List all existing tables:
hive> show tables;
You will receive the following output:
OK ufodata Time taken: 0.156 seconds
- Show tables matching a regular expression:
hive> show tables '.*data';
You will receive the following output:
OK ufodata Time taken: 0.065 seconds
- Validate the table specification:
hive> describe ufodata;
You will receive the following output:
OK sighted string reported string sighting_location string shape string duration string description string Free text description Time taken: 0.086 seconds
- Display a more detailed description of the table:
hive> describe extended ufodata;
You will receive the following output:
OK sighted string reported string … Detailed Table Information Table(tableName:ufodata, dbName:default, owner:hadoop, createTime:1330818664, lastAccessTime:0, retention:0, … …location:hdfs://head:9000/user/hive/warehouse/ufodata, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1,
What just happened?
After starting the interactive Hive interpreter, we used the CREATE TABLE
command to define the structure of the UFO data table. As with standard SQL, this requires that each column in the table has a name and datatype. HiveQL also offers optional comments on each column and on the overall table, as shown previously where we add one column and one table comment.
For the UFO data, we use STRING
as the data type; HiveQL, as with SQL, supports a variety of datatypes:
- Boolean types:
BOOLEAN
- Integer types:
TINYINT
,INT
,BIGINT
- Floating point types:
FLOAT
,DOUBLE
- Textual types:
STRING
After creating the table, we use the SHOW TABLES
statement to verify that the table has been created. This command lists all tables and in this case, our new UFO table is the only one in the system.
We then use a variant on SHOW TABLES
that takes an optional Java regular expression to match against the table name. In this case, the output is identical to the previous command, but in systems with a large number of tables—especially when you do not know the exact name—this variant can be very useful.
Note
We have seen the table exists but we have not validated whether it was created properly. We next do this by using the DESCRIBE TABLE
command to display the specification of the named table. We see that all is as specified (though note the table comment is not shown by this command) and then use the DESCRIBE TABLE EXTENDED
variant to get much more information about the table.
We have omitted much of this final output though a few points of interest are present. Note the input format is specified as TextInputFormat
; by default, Hive will assume any HDFS files inserted into a table are stored as text files.
We also see that the table data will be stored in a directory under the /user/hive/warehouse
HDFS directory we created earlier.
Tip
A note on case sensitivity:
HiveQL, as with SQL, is not case sensitive in terms of keywords, columns, or table names. By convention, SQL statements use uppercase for SQL language keywords and we will generally follow this when using HiveQL within files, as shown later. However, when typing interactive commands, we will frequently take the line of least resistance and use lowercase.
Time for action – inserting the UFO data
Now that we have created a table, let us load the UFO data into it.
- Copy the UFO data file onto HDFS:
$ hadoop fs -put ufo.tsv /tmp/ufo.tsv
- Confirm that the file was copied:
$ hadoop fs -ls /tmp
You will receive the following response:
Found 2 items drwxrwxr-x - hadoop supergroup 0 … 14:52 /tmp/hive-hadoop -rw-r--r-- 3 hadoop supergroup 75342464 2012-03-03 16:01 /tmp/ufo.tsv
- Enter the Hive interactive shell:
$ hive
- Load the data from the previously copied file into the
ufodata
table:hive> LOAD DATA INPATH '/tmp/ufo.tsv' OVERWRITE INTO TABLE ufodata;
You will receive the following response:
Loading data to table default.ufodata Deleted hdfs://head:9000/user/hive/warehouse/ufodata OK Time taken: 5.494 seconds
- Exit the Hive shell:
hive> quit;
- Check the location from which we copied the data file:
$ hadoop fs -ls /tmp
You will receive the following response:
Found 1 items drwxrwxr-x - hadoop supergroup 0 … 16:10 /tmp/hive-hadoop
What just happened?
We first copied onto HDFS the tab-separated file of UFO sightings used previously in Chapter 4, Developing MapReduce Programs. After validating the file's presence on HDFS, we started the Hive interactive shell and used the LOAD DATA
command to load the file into the ufodata
table.
Because we are using a file already on HDFS, the path was specified by INPATH
alone. We could have loaded directly from a file on the local filesystem (obviating the need for the prior explicit HDFS copy) by using LOCAL INPATH
.
We specified the OVERWRITE
statement which will delete any existing data in the table before loading the new data. This obviously should be used with care, as can be seen from the output of the command, the directory holding the table data is removed by use of OVERWRITE
.
Note the command took only a little over five seconds to execute, significantly longer than it would have taken to copy the UFO data file onto HDFS.
Note
Though we specified an explicit file in this example, it is possible to load multiple files with a single statement by specifying a directory as the INPATH
location; in such a case, all files within the directory will be loaded into the table.
After exiting the Hive shell, we look again at the directory into which we copied the data file and find it is no longer there. If a LOAD
statement is given a path to data on HDFS, it will not simply copy this into /user/hive/datawarehouse
, but will move it there instead. If you want to analyze data on HDFS that is used by other applications, then either create a copy or use the EXTERNAL
mechanism that will be described later.
Validating the data
Now that we have loaded the data into our table, it is good practice to do some quick validating queries to confirm all is as expected. Sometimes our initial table definition turns out to be incorrect.
Time for action – validating the table
The easiest way to do some initial validation is to perform some summary queries to validate the import. This is similar to the types of activities for which we used Hadoop Streaming in Chapter 4, Developing MapReduce Programs.
- Instead of using the Hive shell, pass the following HiveQL to the
hive
command-line tool to count the number of entries in the table:$ hive -e "select count(*) from ufodata;"
You will receive the following response:
Total MapReduce jobs = 1 Launching Job 1 out of 1 … Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2012-03-03 16:15:15,510 Stage-1 map = 0%, reduce = 0% 2012-03-03 16:15:21,552 Stage-1 map = 100%, reduce = 0% 2012-03-03 16:15:30,622 Stage-1 map = 100%, reduce = 100% Ended Job = job_201202281524_0006 MapReduce Jobs Launched: Job 0: Map: 1 Reduce: 1 HDFS Read: 75416209 HDFS Write: 6 SUCESS Total MapReduce CPU Time Spent: 0 msec OK 61393 Time taken: 28.218 seconds
- Display a sample of five values for the sighted column:
$ hive -e "select sighted from ufodata limit 5;"
You will receive the following response:
Total MapReduce jobs = 1 Launching Job 1 out of 1 … OK 19951009 19951009 Iowa City, IA Man repts. witnessing "flash, followed by a classic UFO, w/ a tailfin at back." Red color on top half of tailfin. Became triangular. 19951010 19951011 Milwaukee, WI 2 min. Man on Hwy 43 SW of Milwaukee sees large, bright blue light streak by his car, descend, turn, cross road ahead, strobe. Bizarre! 19950101 19950103 Shelton, WA Telephoned Report:CA woman visiting daughter witness discs and triangular ships over Squaxin Island in Puget Sound. Dramatic. Written report, with illustrations, submitted to NUFORC. 19950510 19950510 Columbia, MO 2 min. Man repts. son's bizarre sighting of small humanoid creature in back yard. Reptd. in Acteon Journal, St. Louis UFO newsletter. 19950611 19950614 Seattle, WA Anonymous caller repts. sighting 4 ufo's in NNE sky, 45 deg. above horizon. (No other facts reptd. No return tel. #.) Time taken: 11.693 seconds
What just happened?
In this example, we use the hive -e
command to directly pass HiveQL to the Hive tool instead of using the interactive shell. The interactive shell is useful when performing a series of Hive operations. For simple statements, it is often more convenient to use this approach and pass the query string directly to the command-line tool. This also shows that Hive can be called from scripts like any other Unix tool.
Note
When using hive –e
, it is not necessary to terminate the HiveQL string with a semicolon, but if you are like me, the habit is hard to break. If you want multiple commands in a single string, they must obviously be separated by semicolons.
The result of the first query is 61393, the same number of records we saw when analyzing the UFO data set previously with direct MapReduce. This tells us the entire data set was indeed loaded into the table.
We then execute a second query to select five values of the first column in the table, which should return a list of five dates. However, the output instead includes the entire record which has been loaded into the first column.
The issue is that though we relied on Hive loading our data file as a text file, we didn't take into account the separator between columns. Our file is tab separated, but Hive, by default, expects its input files to have fields separated by the ASCII code 00 (control-A).
Time for action – redefining the table with the correct column separator
Let's fix our table specification as follows:
- Create the following file as
commands.hql
:DROP TABLE ufodata ; CREATE TABLE ufodata(sighted string, reported string, sighting_location string, shape string, duration string, description string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ; LOAD DATA INPATH '/tmp/ufo.tsv' OVERWRITE INTO TABLE ufodata ;
- Copy the data file onto HDFS:
$ hadoop fs -put ufo.tsv /tmp/ufo.tsv
- Execute the HiveQL script:
$ hive -f commands.hql
You will receive the following response:
OK Time taken: 5.821 seconds OK Time taken: 0.248 seconds Loading data to table default.ufodata Deleted hdfs://head:9000/user/hive/warehouse/ufodata OK Time taken: 0.285 seconds
- Validate the number of rows in the table:
$ hive -e "select count(*) from ufodata;"
You will receive the following response:
OK 61393 Time taken: 28.077 seconds
- Validate the contents of the reported column:
$ hive -e "select reported from ufodata limit 5"
You will receive the following response:
OK 19951009 19951011 19950103 19950510 19950614 Time taken: 14.852 seconds
What just happened?
We introduced a third way to invoke HiveQL commands in this example. In addition to using the interactive shell or passing query strings to the Hive tool, we can have Hive read and execute the contents of a file containing a series of HiveQL statements.
We first created such a file that deletes the old table, creates a new one, and loads the data file into it.
The main differences with the table specification are the ROW FORMAT
and FIELDS TERMINATED BY
statements. We need both these commands as the first tells Hive that the row contains multiple delimited fields, while the second specifies the actual separator. As can be seen here, we can use both explicit ASCII codes as well as common tokens such as \t
for tab.
Note
Be careful with the separator specification as it must be precise and is case sensitive. Do not waste a few hours by accidentally writing \T
instead of \t
as I did recently.
Before running the script, we copy the data file onto HDFS again—the previous copy was removed by the DELETE
statement—and then use hive -f
to execute the HiveQL file.
As before, we then execute two simple SELECT
statements to first count the rows in the table and then extract the specific values from a named column for a small number of rows.
The overall row count is, as should be expected, the same as before, but the second statement now produces what looks like correct data, showing that the rows are now correctly being split into their constituent fields.
Hive tables – real or not?
If you look closely at the time taken by the various commands in the preceding example, you'll see a pattern which may at first seem strange. Loading data into a table takes about as long as creating the table specification, but even the simple count of all row statements takes significantly longer. The output also shows that table creation and the loading of data do not actually cause MapReduce jobs to be executed, which explains the very short execution times.
When loading data into a Hive table, the process is different from what may be expected with a traditional relational database. Although Hive copies the data file into its working directory, it does not actually process the input data into rows at that point. What it does instead is create metadata around the data which is then used by subsequent HiveQL queries.
Both the CREATE TABLE
and LOAD DATA
statements, therefore, do not truly create concrete table data as such, instead they produce the metadata that will be used when Hive is generating MapReduce jobs to access the data conceptually stored in the table.
Time for action – creating a table from an existing file
So far we have loaded data into Hive directly from files over which Hive effectively takes control. It is also possible, however, to create tables that model data held in files external to Hive. This can be useful when we want the ability to perform Hive processing over data written and managed by external applications or otherwise required to be held in directories outside the Hive warehouse directory. Such files are not moved into the Hive warehouse directory or deleted when the table is dropped.
- Save the following to a file called
states.hql
:CREATE EXTERNAL TABLE states(abbreviation string, full_name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/tmp/states' ;
- Copy the data file onto HDFS and confirm its presence afterwards:
$ hadoop fs -put states.txt /tmp/states/states.txt $ hadoop fs -ls /tmp/states
You will receive the following response:
Found 1 items -rw-r--r-- 3 hadoop supergroup 654 2012-03-03 16:54 /tmp/states/states.txt
- Execute the HiveQL script:
$ hive -f states.hql
You will receive the following response:
Logging initialized using configuration in jar:file:/opt/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/hadoop/hive_job_log_hadoop_201203031655_1132553792.txt OK Time taken: 3.954 seconds OK Time taken: 0.594 seconds
- Check the source data file:
$ hadoop fs -ls /tmp/states
You will receive the following response:
Found 1 items -rw-r--r-- 3 hadoop supergroup 654 … /tmp/states/states.txt
- Execute a sample query against the table:
$ hive -e "select full_name from states where abbreviation like 'CA'"
You will receive the following response:
Logging initialized using configuration in jar:file:/opt/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/hadoop/hive_job_log_hadoop_201203031655_410945775.txt Total MapReduce jobs = 1 ... OK California Time taken: 15.75 seconds
What just happened?
The HiveQL statement to create an external table only differs slightly from the forms of CREATE TABLE
we used previously. The EXTERNAL
keyword specifies that the table exists in resources that Hive does not control and the LOCATION
clause specifies where the source file or directory are to be found.
After creating the HiveQL script, we copied the source file onto HDFS. For this table, we used the data file from Chapter 4, Developing MapReduce Programs, which maps U.S. states to their common two-letter abbreviation.
After confirming the file was in the expected location on HDFS, we executed the query to create the table and checked the source file again. Unlike previous table creations that moved the source file into the /user/hive/warehouse
directory, the states.txt
file is still in the HDFS location into which it was copied.
Finally, we executed a query against the table to confirm it was populated with the source data and the expected result confirms this. This highlights an additional difference with this form of CREATE TABLE
; for our previous non-external tables, the table creation statement does not ingest any data into the table, a subsequent LOAD DATA
or (as we'll see later) INSERT
statement performs the actual table population. With table definitions that include the LOCATION
specification, we can create the table and ingest data in a single statement.
We now have two tables in Hive; the larger table with UFO sighting data and a smaller one mapping U.S. state abbreviations to their full names. Wouldn't it be a useful combination to use data from the second table to enrich the location column in the former?
Time for action – performing a join
Joins are a very frequently used tool in SQL, though sometimes appear a little intimidating to those new to the language. Essentially a join allows rows in multiple tables to be logically combined together based on a conditional statement. Hive has rich support for joins which we will now examine.
- Create the following as
join.hql
:SELECT t1.sighted, t2.full_name FROM ufodata t1 JOIN states t2 ON (LOWER(t2.abbreviation) = LOWER(SUBSTR( t1.sighting_location, (LENGTH(t1.sighting_location)-1)))) LIMIT 5 ;
- Execute the query:
$ hive -f join.hql
You will receive the following response:
OK 20060930 Alaska 20051018 Alaska 20050707 Alaska 20100112 Alaska 20100625 Alaska Time taken: 33.255 seconds
What just happened?
The actual join
query is relatively straightforward; we want to extract the sighted date and location for a series of records but instead of the raw location field, we wish to map this into the full state name. The HiveQL file we created performs such a query. The join itself is specified by the standard JOIN
keyword and the matching condition is contained in the ON
clause.
Things are complicated by a restriction of Hive in that it only supports equijoins, that is, those where the ON clause contains an equality check. It is not possible to specify a join condition using operators such as >
, ?
, <
, or as we would have preferred to use here, the LIKE
keyword.
Instead, therefore, we have an opportunity to introduce several of Hive's built-in functions, in particular, those to convert a string to lowercase (LOWER
), to extract a substring from a string (SUBSTR
) and to return the number of characters in a string (LENGTH
).
We know that most location entries are of the form "city, state_abbreviation." So we use SUBSTR
to extract the third and second from last characters in the string, using length
to calculate the indices. We convert both the state abbreviation and extracted string to lower case via LOWER
because we cannot assume that all entries in the sighting table will correctly use uniform capitalization.
After executing the script, we get the expected sample lines of output that indeed include the sighting date and full state name instead of the abbreviation.
Note the use of the LIMIT
clause that simply constrains how many output rows will be returned from the query. This is also an indication that HiveQL is most similar to SQL dialects such as those found in open source databases such as MySQL.
This example shows an inner join; Hive also supports left and right outer joins as well as left semi joins. There are a number of subtleties around the use of joins in Hive (such as the aforementioned equijoin restriction) and you should really read through the documentation on the Hive homepage if you are likely to use joins, especially when using very large tables.
Note
This is not a criticism of Hive alone; joins are incredibly powerful tools but it is probably fair to say that badly written joins or those created in ignorance of critical constraints have brought more relational databases to a grinding halt than any other type of SQL query.
Have a go hero – improve the join to use regular expressions
As well as the string functions we used previously, Hive also has functions such as RLIKE
and REGEXP_EXTRACT
that provide direct support for Java-like regular expression manipulation. Rewrite the preceding join specification using regular expressions to make a more accurate and elegant join statement.
Hive and SQL views
Another powerful SQL feature supported by Hive is views. These are useful when instead of a static table the contents of a logical table are specified by a SELECT
statement and subsequent queries can then be executed against this dynamic view (hence the name) of the underlying data.
Time for action – using views
We can use views to hide the underlying query complexity such as the previous join example. Let us now create a view to do just that.
- Create the following as
view.hql
:CREATE VIEW IF NOT EXISTS usa_sightings (sighted, reported, shape, state) AS select t1.sighted, t1.reported, t1.shape, t2.full_name FROM ufodata t1 JOIN states t2 ON (LOWER(t2.abbreviation) = LOWER(substr( t1.sighting_location, (LENGTH(t1.sighting_location)-1)))) ;
- Execute the script:
$ hive -f view.hql
You will receive the following response:
Logging initialized using configuration in jar:file:/opt/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/hadoop/hive_job_log_hadoop_201203040557_1017700649.txt OK Time taken: 5.135 seconds
- Execute the script again:
$ hive -f view.hql
You will receive the following response:
Logging initialized using configuration in jar:file:/opt/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/hadoop/hive_job_log_hadoop_201203040557_851275946.txt OK Time taken: 4.828 seconds
- Execute a test query against the view:
$ hive -e "select count(state) from usa_sightings where state = 'California'"
You will receive the following response:
Logging initialized using configuration in jar:file:/opt/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/hadoop/hive_job_log_hadoop_201203040558_1729315866.txt Total MapReduce jobs = 2 Launching Job 1 out of 2 … 2012-03-04 05:58:12,991 Stage-1 map = 0%, reduce = 0% 2012-03-04 05:58:16,021 Stage-1 map = 50%, reduce = 0% 2012-03-04 05:58:18,046 Stage-1 map = 100%, reduce = 0% 2012-03-04 05:58:24,092 Stage-1 map = 100%, reduce = 100% Ended Job = job_201203040432_0027 Launching Job 2 out of 2 … 2012-03-04 05:58:33,650 Stage-2 map = 0%, reduce = 0% 2012-03-04 05:58:36,673 Stage-2 map = 100%, reduce = 0% 2012-03-04 05:58:45,730 Stage-2 map = 100%, reduce = 100% Ended Job = job_201203040432_0028 MapReduce Jobs Launched: Job 0: Map: 2 Reduce: 1 HDFS Read: 75416863 HDFS Write: 116 SUCESS Job 1: Map: 1 Reduce: 1 HDFS Read: 304 HDFS Write: 5 SUCESS Total MapReduce CPU Time Spent: 0 msec. OK 7599 Time taken: 47.03 seconds
- Delete the view:
$ hive -e "drop view usa_sightings"
You will receive the following output on your screen:
OK Time taken: 5.298 seconds
What just happened?
We firstly created the view using the CREATE VIEW
statement. This is similar to CREATE TABLE
but has two main differences:
- The column definitions include only the name as the type, which will be determined from the underlying query
- The
AS
clause specifies theSELECT
statement that will be used to generate the view
We use the previous join statement to generate the view, so in effect we are creating a table that has the location field normalized to the full state name without directly requiring the user to deal with how that normalization is performed.
The optional IF NOT EXISTS
clause (which can also be used with CREATE TABLE
) means that Hive will ignore duplicate attempts to create the view. Without this clause, repeated attempts to create the view will generate errors, which isn't always the desired behavior.
We then execute this script twice to both create the view and to demonstrate that the inclusion of the IF NOT EXISTS
clause is preventing errors as we intended.
With the view created, we then execute a query against it, in this case, to simply count how many of the sightings took place in California. All our previous Hive statements that generate MapReduce jobs have only produced a single one; this query against our view requires a pair of chained MapReduce jobs. Looking at the query and the view specification, this isn't necessarily surprising; it's not difficult to imagine how the view would be realized by the first MapReduce job and its output fed to the subsequent counting query performed as the second job. As a consequence, you will also see this two-stage job take much longer than any of our previous queries.
Hive is actually smarter than this. If the outer query can be folded into the view creation, then Hive will generate and execute only one MapReduce job. Given the time taken to hand-develop a series of co-operating MapReduce jobs this is a great example of the benefits Hive can offer. Though a hand-written MapReduce job (or series of jobs) is likely to be much more efficient, Hive is a great tool for determining which jobs are useful in the first place. It is better to run a slow Hive query to determine an idea isn't as useful as hoped instead of spending a day developing a MapReduce job to come to the same conclusion.
We have mentioned that views can hide underlying complexity; this does often mean that executing views is intrinsically slow. For large-scale production workloads, you will want to optimize the SQL and possibly remove the view entirely.
After running the query, we delete the view through the DROP VIEW
statement, which demonstrates again the similarity between how HiveQL (and SQL) handle tables and views.
Handling dirty data in Hive
The observant among you may notice that the number of California sightings reported by this query is different from the number we generated in Chapter 4, Developing MapReduce Programs. Why?
Recall that before running our Hadoop Streaming or Java MapReduce jobs in Chapter 4, Developing MapReduce Programs, we had a mechanism to ignore input rows that were malformed. Then while processing the data, we used more precise regular expressions to extract the two-letter state abbreviation from the location field. However, in Hive, we did no such pre-processing and relied on quite crude mechanisms to extract the abbreviation.
On the latter, we could use some of Hive's previously mentioned functions that support regular expressions but for the former, we'd at best be forced to add complex validation WHERE
clauses to many of our queries.
A frequent pattern is to instead preprocess data before it is imported into Hive, so for example, in this case, we could run a MapReduce job to remove all malformed records in the input file and another to do the normalization of the location field in advance.
Have a go hero – do it!
Write MapReduce jobs (it could be one or two) to do this pre-processing of the input data and generate a cleaned-up file more suited for direct importation into Hive. Then write a script to execute the jobs, create a Hive table, and import the new file into the table. This will also show how easily and powerfully scriptable Hadoop and Hive can be together.
Time for action – exporting query output
We have previously either loaded large quantities of data into Hive or extracted very small quantities as query results. We can also export large result sets; let us look at an example.
- Recreate the previously used view:
$ hive -f view.hql
- Create the following file as
export.hql
:INSERT OVERWRITE DIRECTORY '/tmp/out' SELECT reported, shape, state FROM usa_sightings WHERE state = 'California' ;
- Execute the script:
$ hive -f export.hql
You will receive the following response:
2012-03-04 06:20:44,571 Stage-1 map = 100%, reduce = 100% Ended Job = job_201203040432_0029 Moving data to: /tmp/out 7599 Rows loaded to /tmp/out MapReduce Jobs Launched: Job 0: Map: 2 Reduce: 1 HDFS Read: 75416863 HDFS Write: 210901 SUCESS Total MapReduce CPU Time Spent: 0 msec OK Time taken: 46.669 seconds
- Look in the specified output directory:
$ hadoop fs -ls /tmp/out
You will receive the following response:
Found 1 items -rw-r--r-- 3 hadoop supergroup 210901 … /tmp/out/000000_1
- Examine the output file:
$ hadoop fs -cat /tmp/out/000000_1 | head
You will receive the following output on your screen:
20021014_ light_California 20050224_ other_California 20021001_ egg_California 20030527_ sphere_California 20050813_ light_California 20040701_ other_California 20031007_ light_California
What just happened?
After reusing the previous view, we created our HiveQL script using the INSERT OVERWRITE DIRECTORY
command. This, as the name suggests, places the results of the subsequent statement into the specified location. The OVERWRITE
modifier is again optional and simply determines if any existing content in the location is to be firstly removed or not. The INSERT
command is followed by a SELECT
statement which produces the data to be written to the output location. In this example, we use a query on our previously created view which you will recall is built atop a join, demonstrating how the query here can be arbitrarily complex.
There is an additional optional LOCAL
modifier for occasions when the output data is to be written to the local filesystem of the host running the Hive command instead of HDFS.
When we run the script, the MapReduce output is mostly as we have come to expect but with the addition of a line stating how many rows have been exported to the specified output location.
After running the script, we check the output directory and see if the result file is there and when we look at it, the contents are as we would expect.
Note
Just as Hive's default separator for text files in inputs is ASCII code 0001 ('\a'), it also uses this as the default separator for output files, as shown in the preceding example.
The INSERT
command can also be used to populate one table with the results of a query on others and we will look at that next. First, we need to explain a concept we will use at the same time.
Partitioning the table
We mentioned earlier that badly written joins have a long and disreputable history of causing relational databases to spend huge amounts of time grinding through unnecessary work. A similar sad tale can be told of queries that perform full table scans (visiting every row in the table) instead of using indices that allow direct access to rows of interest.
For data stored on HDFS and mapped into a Hive table, the default situation almost demands full table scans. With no way of segmenting data into a more organized structure that allows processing to apply only to the data subset of interest, Hive is forced to process the entire data set. For our UFO file of approximately 70 MB, this really is not a problem as we see the file processed in tens of seconds. However, what if it was a thousand times larger?
As with traditional relational databases, Hive allows tables to be partitioned based on the values of virtual columns and for these values to then be used in query predicates later.
In particular, when a table is created, it can have one or more partition columns and when loading data into the table, the specified values for these columns will determine the partition into which the data is written.
The most common partitioning strategy for tables that see lots of data ingested on a daily basis is for the partition column to be the date. Future queries can then be constrained to process only that data contained within a particular partition. Under the covers, Hive stores each partition in its own directory and files, which is how it can then apply MapReduce jobs only on the data of interest. Through the use of multiple partition columns, it is possible to create a rich hierarchical structure and for large tables with queries that require only small subsets of data it is worthwhile spending some time deciding on the optimal partitioning strategy.
For our UFO data set, we will use the year of the sighting as the partition value but we have to use a few less common features to make it happen. Hence, after this introduction, let us now make some partitions!
Time for action – making a partitioned UFO sighting table
We will create a new table for the UFO data to demonstrate the usefulness of partitioning.
- Save the following query as
createpartition.hql
:CREATE TABLE partufo(sighted string, reported string, sighting_location string,shape string, duration string, description string) PARTITIONED BY (year string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
- Save the following query as
insertpartition.hql
:SET hive.exec.dynamic.partition=true ; SET hive.exec.dynamic.partition.mode=nonstrict ; INSERT OVERWRITE TABLE partufo partition (year) SELECT sighted, reported, sighting_location, shape, duration, description, SUBSTR(TRIM(sighted), 1,4) FROM ufodata ;
- Create the partitioned table:
$ hive -f createpartition.hql
You will receive the following response:
Logging initialized using configuration in jar:file:/opt/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/hadoop/hive_job_log_hadoop_201203101838_17331656.txt OK Time taken: 4.754 seconds
- Examine the created table:
OK sighted string reported string sighting_location string shape string duration string description string year string Time taken: 4.704 seconds
- Populate the table:
$ hive -f insertpartition.hql
You will see the following lines on the screen:
Total MapReduce jobs = 2 … … Ended Job = job_201203040432_0041 Ended Job = 994255701, job is filtered out (removed at runtime). Moving data to: hdfs://head:9000/tmp/hive-hadoop/hive_2012-03-10_18-38-36_380_1188564613139061024/-ext-10000 Loading data to table default.partufo partition (year=null) Loading partition {year=1977} Loading partition {year=1880} Loading partition {year=1975} Loading partition {year=2007} Loading partition {year=1957} … Table default.partufo stats: [num_partitions: 100, num_files: 100, num_rows: 0, total_size: 74751215, raw_data_size: 0] 61393 Rows loaded to partufo … OK Time taken: 46.285 seconds
- Execute a
count
command against a partition:$ hive –e "select count(*)from partufo where year = '1989'"
You will receive the following response:
OK 249 Time taken: 26.56 seconds
- Execute a similar query on the non-partitioned table:
$ hive –e "select count(*) from ufodata where sighted like '1989%'"
You will receive the following response:
OK 249 Time taken: 28.61 seconds
- List the contents of the Hive directory housing the partitioned table:
$ Hadoop fs –ls /user/hive/warehouse/partufo
You will receive the following response:
Found 100 items drwxr-xr-x - hadoop supergroup 0 2012-03-10 18:38 /user/hive/warehouse/partufo/year=0000 drwxr-xr-x - hadoop supergroup 0 2012-03-10 18:38 /user/hive/warehouse/partufo/year=1400 drwxr-xr-x - hadoop supergroup 0 2012-03-10 18:38 /user/hive/warehouse/partufo/year=1762 drwxr-xr-x - hadoop supergroup 0 2012-03-10 18:38 /user/hive/warehouse/partufo/year=1790 drwxr-xr-x - hadoop supergroup 0 2012-03-10 18:38 /user/hive/warehouse/partufo/year=1860 drwxr-xr-x - hadoop supergroup 0 2012-03-10 18:38 /user/hive/warehouse/partufo/year=1864 drwxr-xr-x - hadoop supergroup 0 2012-03-10 18:38 /user/hive/warehouse/partufo/year=1865
What just happened?
We created two HiveQL scripts for this example. The first of these creates the new partitioned table. As we can see, it looks very much like the previous CREATE TABLE
statements; the difference is in the additional PARTITIONED BY
clause.
After we execute this script, we describe the table and see that from a HiveQL perspective the table appears just like the previous ufodata
table but with the addition of an extra column for the year. This allows the column to be treated as any other when it comes to specifying conditions in WHERE
clauses, even though the column data does not actually exist in the on-disk data files.
We next execute the second script which performs the actual loading of data into the partitioned table. There are several things of note here.
Firstly, we see that the INSERT
command can be used with tables just as we previously did for directories. The INSERT
statement has a specification of where the data is to go and a subsequent SELECT
statement gathers the required data from existing tables or views.
The partitioning mechanism used here is taking advantage of a relatively new feature in Hive, dynamic partitions. In most cases, the partition clause in this statement would include an explicit value for the year column. But though that would work if we were uploading a day's data into a daily partition, it isn't suitable for our type of data file where the various rows should be inserted into a variety of partitions. By simply specifying the column name with no value, the partition name will be automatically generated by the value of the year column returned from the SELECT
statement.
This hopefully explains the strange final clause in the SELECT
statement; after specifying all the standard columns from ufodata
, we add a specification that extracts a string containing the first four characters of the sighting column. Remember that because the partitioned table sees the year partition column as the seventh column, this means we are assigning the year component of the sighted string to the year column in each row. Consequently, each row is inserted into the partition associated with its sighting year.
To prove this is working as expected, we then perform two queries; one counts all records in the partition for 1989 in the partitioned table, the other counts the records in ufodata
that begin with the string "1989", that is, the component used to dynamically create the partitions previously.
As can be seen, both queries return the same result, verifying that our partitioning strategy is working as expected. We also note that the partitioned query is a little faster than the other, though not by very much. This is likely due to the MapReduce start up times dominating the processing of our relatively modest data set.
Finally, we take a look inside the directory where Hive stores the data for the partitioned table and see that there is indeed a directory for each of the 100 dynamically-generated partitions. Any time we now express HiveQL statements that refer to specific partitions, Hive can perform a significant optimization by processing only the data found in the appropriate partitions' directories.
Bucketing, clustering, and sorting... oh my!
We will not explore it in detail here, but hierarchical partition columns are not the full extent of how Hive can optimize data access patterns within subsets of data. Within a partition, Hive provides a mechanism to further gather rows into buckets using a hash function on specified CLUSTER BY
columns. Within a bucket, the rows can be kept in sorted order using specified SORT BY
columns. We could, for example, have bucketed our data based on the UFO shape and within each bucket sorted on the sighting date.
These aren't necessarily features you'll need to use on day 1 with Hive, but if you find yourself using larger and larger data sets, then considering this type of optimization may help query processing time significantly.
User-Defined Function
Hive provides mechanisms for you to hook custom code directly into the HiveQL execution. This can be in the form of adding new library functions or by specifying Hive transforms , which work quite similarly to Hadoop Streaming. We will look at user-defined functions in this section as they are where you are most likely to have an early need to add custom code. Hive transforms are a somewhat more involved mechanism by which you can add custom map and reduce classes that are invoked by the Hive runtime. If transforms are of interest, they are well documented on the Hive wiki.
Time for action – adding a new User Defined Function (UDF)
Let us show how to create and invoke some custom Java code via a new UDF.
- Save the following code as
City.java
:package com.kycorsystems ; import java.util.regex.Matcher ; import java.util.regex.Pattern ; import org.apache.hadoop.hive.ql.exec.UDF ; import org.apache.hadoop.io.Text ; public class City extends UDF { private static Pattern pattern = Pattern.compile( "[a-zA-z]+?[\\. ]*[a-zA-z]+?[\\, ][^a-zA-Z]") ; public Text evaluate( final Text str) { Text result ; String location = str.toString().trim() ; Matcher matcher = pattern.matcher(location) ; if (matcher.find()) { result = new Text( location.substring(matcher.start(), matcher.end()-2)) ; } else { result = new Text("Unknown") ; } return result ; } }
- Compile this file:
$ javac -cp hive/lib/hive-exec-0.8.1.jar:hadoop/hadoop-1.0.4-core.jar -d . City.java
- Package the generated class file into a JAR file:
$ jar cvf city.jar com
You will receive the following response:
added manifest adding: com/(in = 0) (out= 0)(stored 0%) adding: com/kycorsystems/(in = 0) (out= 0)(stored 0%) adding: com/kycorsystems/City.class(in = 1101) (out= 647)(deflated 41%)
- Start the interactive Hive shell:
$ hive
- Add the new JAR file to the Hive classpath:
hive> add jar city.jar;
You will receive the following response:
Added city.jar to class path Added resource: city.jar
- Confirm that the JAR file was added:
hive> list jars;
You will receive the following response:
file:/opt/hive-0.8.1/lib/hive-builtins-0.8.1.jar city.jar
- Register the new code with a function name:
hive> create temporary function city as 'com.kycorsystems.City' ;
You will receive the following response:
OK Time taken: 0.277 seconds
- Execute a query using the new function:
hive> select city(sighting_location), count(*) as total > from partufo > where year = '1999' > group by city(sighting_location) > having total > 15 ;
You will receive the following response:
Total MapReduce jobs = 1 Launching Job 1 out of 1 … OK Chicago 19 Las Vegas 19 Phoenix 19 Portland 17 San Diego 18 Seattle 26 Unknown 34 Time taken: 29.055 seconds
What just happened?
The Java class we wrote extends the base org.apache.hadoop.hive.exec.ql.UDF
(User Defined Function) class. Into this class, we define a method for returning a city name given a location string that follows the general pattern we have seen previously.
UDF does not actually define a series of evaluate methods based on type; instead, you are free to add your own with arbitrary arguments and return types. Hive uses Java Reflection to select the correct evaluation method, and if you require a finer-grained selection, you can develop your own utility class that implements the UDFMethodResolver
interface.
The regular expression used here is a little unwieldy; we wish to extract the name of the city, assuming it will be followed by a state abbreviation. However, inconsistency in how the names are delineated and handling of multi-word names gives us the regular expression seen before. Apart from this, the class is pretty straightforward.
We compile the City.java
file, adding the necessary JARs from both Hive and Hadoop as we do so.
Note
Remember, of course, that the specific JAR filenames may be different if you are not using the same versions of both Hadoop and Hive.
We then bundle the generated class file into a JAR and start the Hive interactive shell.
After creating the JAR, we need to configure Hive to use it. This is a two-step process. Firstly, we use the add jar
command to add the new JAR file to the classpath used by Hive. After doing so, we use the list jars
command to confirm that our new JAR has been registered in the system.
Adding the JAR only tells Hive that some code exists, it does not say how we wish to refer to the function within our HiveQL statements. The CREATE FUNCTION
command does this—associating a function name (in this case, city
) with the fully qualified Java class that provides the implementation (in this case, com.kycorsystems.City
).
With both the JAR file added to the classpath and the function created, we can now refer to our city()
function within our HiveQL statements.
We next ran an example query that demonstrates the new function in action. Going back to the partitioned UFO sightings table, we thought it would be interesting to see where the most UFO sightings were occurring as everyone prepared for the end-of-millennium apocalypse.
As can be seen from the HiveQL statement, we can use our new function just like any other and indeed the only way to know which functions are built-in and which are UDFs is through familiarity with the standard Hive function library.
The result shows a significant concentration of sightings in the north-west and south-west of the USA, Chicago being the only exception. We did get quite a few Unknown
results however, and it would require further analysis to determine if that was due to locations outside of the U.S. or if we need to further refine our regular expression.
To preprocess or not to preprocess...
Let us re-visit an earlier topic; the potential need to pre-process data into a cleaner form before it is imported into Hive. As can be seen from the preceding example, we could perform similar processing on the fly through a series of UDFs. We could, for example, add functions called state
and country
that extract or infer the further region and nation components from the location sighting string. There are rarely concrete rules for which approach is best, but a few guidelines may help.
If, as is the case here, we are unlikely to actually process the full location string for reasons other than to extract the distinct components, then preprocessing likely makes more sense. Instead of performing expensive text processing every time the column is accessed, we could either normalize it into a more predictable format or even break it out into separate city/region/country columns.
If, however, a column is usually used in HiveQL in its original form and additional processing is the exceptional case, then there is likely little benefit to an expensive processing step across the entire data set.
Use the strategy that makes the most sense for your data and workloads. Remember that UDFs are for much more than this sort of text processing, they can be used to encapsulate any type of logic that you wish to apply to data in your tables.
Hive versus Pig
Search the Internet for articles about Hive and it won't be long before you find many comparing Hive to another Apache project called Pig . Some of the most common questions around this comparison are why both exist, when to use one over the other, which is better, and which makes you look cooler when wearing the project t-shirt in a bar.
The overlap between the projects is that whereas Hive looks to present a familiar SQL-like interface to data, Pig uses a language called Pig Latin that specifies dataflow pipelines. Just as Hive translates HiveQL into MapReduce which it then executes, Pig performs similar MapReduce code generation from the Pig Latin scripts.
The biggest difference between HiveQL and Pig Latin is the amount of control expressed over how the job will be executed. HiveQL, just like SQL, specifies what is to be done but says almost nothing about how to actually structure the implementation. The HiveQL query planner is responsible for determining in which order to perform particular parts of the HiveQL command, in which order to evaluate functions, and so on. These decisions are made by Hive at runtime, analogous to a traditional relational database query planner, and this is also the level at which Pig Latin operates.
Both approaches obviate the need to write raw MapReduce code; they differ in the abstractions they provide.
The choice of Hive versus Pig will depend on your needs. If having a familiar SQL interface to the data is important as a means of making the data in Hadoop available to a wider audience, then Hive is the obvious choice. If instead you have personnel who think in terms of data pipelines and need finer-grained control over how the jobs are executed, then Pig may be a better fit. The Hive and Pig projects are looking for closer integration so hopefully the false sense of competition will decrease and instead both will be seen as complementary ways of decreasing the Hadoop knowledge required to execute MapReduce jobs.
What we didn't cover
In this overview of Hive, we have covered its installation and setup, the creation and manipulation of tables, views, and joins. We have looked at how to move data into and out of Hive, how to optimize data processing, and explored several of Hive's built-in functions.
In reality, we have barely scratched the surface. In addition to more depth on the previous topics and a variety of related concepts, we didn't even touch on topics such as the MetaStore where Hive stores its configuration and metadata or SerDe (serialize/deserialize) objects, which can be used to read data from more complex file formats such as JSON.
Hive is an incredibly rich tool with many powerful and complex features. If Hive is something that you feel may be of value to you, then it is recommended that after running through the examples in this chapter that you spend some quality time with the documentation on the Hive website. There you will also find links to the user mailing list, which is a great source of information and help.
Hive on Amazon Web Services
Elastic MapReduce has significant support for Hive with some specific mechanisms to help its integration with other AWS services.
Time for action – running UFO analysis on EMR
Let us explore the use of EMR with Hive by doing some UFO analysis on the platform.
- Log in to the AWS management console at http://aws.amazon.com/console.
- Every Hive job flow on EMR runs from an S3 bucket and we need to select the bucket we wish to use for this purpose. Select S3 to see the list of the buckets associated with your account and then choose the bucket from which to run the example, in the example below, we select the bucket called garryt1use.
- Use the web interface to create three directories called
ufodata
,ufoout
, andufologs
within that bucket. The resulting list of the bucket's contents should look like the following screenshot: - Double-click on the
ufodata
directory to open it and within it create two subdirectories calledufo
andstates
. - Create the following as
s3test.hql
, click on the Upload link within theufodata
directory, and follow the prompts to upload the file:CREATE EXTERNAL TABLE IF NOT EXISTS ufodata(sighted string, reported string, sighting_location string, shape string, duration string, description string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '${INPUT}/ufo' ; CREATE EXTERNAL TABLE IF NOT EXISTS states(abbreviation string, full_name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '${INPUT}/states' ; CREATE VIEW IF NOT EXISTS usa_sightings (sighted, reported, shape, state) AS SELECT t1.sighted, t1.reported, t1.shape, t2.full_name FROM ufodata t1 JOIN states t2 ON (LOWER(t2.abbreviation) = LOWER(SUBSTR( t1.sighting_location, (LENGTH(t1.sighting_location)-1)))) ; CREATE EXTERNAL TABLE IF NOT EXISTS state_results ( reported string, shape string, state string) ROW FORMAT DELIMITED FFIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '${OUTPUT}/states' ; INSERT OVERWRITE TABLE state_results SELECT reported, shape, state FROM usa_sightings WHERE state = 'California' ;
The contents of
ufodata
should now look like the following screenshot: - Double-click the
states
directory to open it and into this, upload thestates.txt
file used earlier. The directory should now look like the following screenshot: - Click on the
ufodata
component at the top of the file list to return to this directory. - Double-click on the
ufo
directory to open it and into this, upload theufo.tsv
file used earlier. The directory should now look like the following screenshot: - Now select Elastic MapReduce and click on Create a New Job Flow. Then select the option Run your own application and select a Hive application, as shown in the following screenshot:
- Click on Continue and then fill in the required details for the Hive job flow. Use the following screenshot as a guide, but remember to change the bucket name (the first component in the
s3:// URLs
) to the bucket you set up before: - Click on Continue, review the number and the type of hosts to be used, and then click on Continue once again. Then fill in the name of the directory for the logs, as shown in the following screenshot:
- Click on Continue. Then do the same through the rest of the job creation process as there are no other default options that need to be changed for this example. Finally start the job flow and monitor its progress from the management console.
- Once the job has completed successfully, go back to S3 and double-click on the
ufoout
directory. Within that should be a directory calledstates
and within that, a file named something like0000000
. Double-click to download the file and verify that its contents look something like the following:20021014 light California 20050224 other California 20021001 egg California 20030527 sphere California
What just happened?
Before we actually execute our EMR job flow, we needed to do a bit of setup in the preceding example. Firstly, we used the S3 web interface to prepare the directory structure for our job. We created three main directories to hold the input data, into which to write results and one for EMR to place logs of the job flow execution.
The HiveQL script is a modification of several of the Hive commands used earlier in this chapter. It creates the tables for the UFO sighting data and state names as well as the view joining them. Then it creates a new table with no source data and uses an INSERT OVERWRITE TABLE
to populate the table with the results of a query.
The unique feature in this script is the way we specify the LOCATION
clauses for each of the tables. For the input tables, we use a path relative to a variable called INPUT
and do likewise with the OUTPUT
variable for the result table.
Note that Hive in EMR expects the location of table data to be a directory and not a file. This is the reason for us previously creating subdirectories for each table into which we uploaded the specific source file instead of specifying the table with the direct path to the data files themselves.
After setting up the required file and directory structure within our S3 bucket, we went to the EMR web console and started the job flow creation process.
After specifying that we wish to use our own program and that it would be a Hive application, we filled in a screen with the key data required for our job flow:
- The location of the HiveQL script itself
- The directory containing input data
- The directory to be used for output data
The path to the HiveQL script is an explicit path and does not require any explanation. However, it is important to realize how the other values are mapped into the variables used within our Hive script.
The value for the input path is available to the Hive script as the INPUT
variable and this is how we then specify the directory containing the UFO sighting data as ${INPUT}/ufo
. Similarly, the output value specified in this form will be used as the OUTPUT
variable within our Hive script.
We did not make any changes to the default host setup, which will be one small master and two small core nodes. On the next screen, we added the location into which we wanted EMR to write the logs produced by the job flow execution.
Though optional, it is useful to capture these logs, particularly in the early stages of running a new script, though obviously S3 storage does have a cost. EMR can also write indexed log data into SimpleDB (another AWS service), but we did not show that in action here.
After completing the job flow definition, we started it and on successful execution, went to the S3 interface to browse to the output location, which happily contained the data we were expecting.
Using interactive job flows for development
When developing a new Hive script to be executed on EMR, the previous batch job execution is not a good fit. There is usually a several minute latency between job flow creation and execution and if the job fails, then the cost of several hours of EC2 instance time will have been incurred (partial hours are rounded up).
Instead of selecting the option to create an EMR job flow to run a Hive script, as in the previous example, we can start a Hive job flow in interactive mode. This effectively spins up a Hadoop cluster without requiring a named script. You can then SSH into the master node as the Hadoop user where you will find Hive installed and configured. It is much more efficient to do the script development in this environment and then, if required, set up the batch script job flows to automatically execute the script in production.
Have a go hero – using an interactive EMR cluster
Start up an interactive Hive job flow in EMR. You will need to have SSH credentials already registered with EC2 so that you can connect to the master node. Run the previous script directly from the master node, remembering to pass the appropriate variables to the script.
Integration with other AWS products
With a local Hadoop/Hive installation, the question of where data lives usually comes down to HDFS or local filesystems. As we have seen previously, Hive within EMR gives another option with its support for external tables whose data resides in S3.
Another AWS service with similar support is DynamoDB (at hosted NoSQL database solution in the cloud. Hive job flows within EMR can declare external tables that either read data from DynamoDB or use it as the destination for query output.
This is a very powerful model as it allows Hive to be used to process and combine data from multiple sources while the mechanics of mapping data from one system into Hive tables happens transparently. It also allows Hive to be used as a mechanism for moving data from one system to another. The act of getting data frequently into such hosted services from existing stores is a major adoption hurdle.
Summary
We have looked at Hive in this chapter and learned how it provides many tools and features that will be familiar to anyone who uses relational databases. Instead of requiring development of MapReduce applications, Hive makes the power of Hadoop available to a much broader community.
In particular, we downloaded and installed Hive, learning that it is a client application that translates its HiveQL language into MapReduce code, which it submits to a Hadoop cluster. We explored Hive's mechanism for creating tables and running queries against these tables. We saw how Hive can support various underlying data file formats and structures and how to modify those options.
We also appreciated that Hive tables are largely a logical construct and that behind the scenes, all the SQL-like operations on tables are in fact executed by MapReduce jobs on HDFS files. We then saw how Hive supports powerful features such as joins and views and how to partition our tables to aid in efficient query execution.
We used Hive to output the results of a query to files on HDFS and saw how Hive is supported by Elastic MapReduce, where interactive job flows can be used to develop new Hive applications, and then ran automatically in batch mode.
As we have mentioned several times in this book, Hive looks like a relational database but is not really one. However, in many cases you will find existing relational databases are part of the broader infrastructure into which you need integrate. Performing that integration and how to move data across these different types of data sources will be the topic of the next chapter.