This post describes the use of Sqoop (sqoop-1.4.4.bin__hadoop-1.0.0) and Hive ( hive-0.11.0-bin) in conjunction with Hadoop (hadoop-1.2.1). The work flow is: using sqoop upload your data from RDBMS to Hive; process Hive data while running on Hadoop cluster.
Hadoop Stuff
javac -cp ../hadoop-core-1.2.1.jar -d wordcount_classes WordCount.java jar -cvf wordcount.jar -C wordcount_classes/ . hadoop dfs -rmr /user/hduser/keyword-output hadoop jar wordcount.jar WordCount /user/hduser/keyword /user/hduser/keyword-output rm /tmp/keyword-output hadoop dfs -getmerge /user/hduser/keyword-output /tmp/keyword-output cat /tmp/keyword-output =============================================================================WordCount.java
============================================================================= import java.io.IOException; import java.util.*; import java.io.FileInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper, JobConfigurable { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private static String keyWord=null; public void configure(JobConf job) { keyWord=job.get("keyword"); System.out.println("in configure keyword=" + keyWord); } public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { /*Properties properties; properties = new Properties(); try { FileInputStream fis = new FileInputStream("/usr/local/hadoop/bin/keyword.properties"); properties.load(fis); fis.close(); } catch (Exception e) { System.out.println("Error reading keyword.properties"); } keyWord=properties.getProperty("keyword"); */ String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { String kword=tokenizer.nextToken(); // System.out.println("kword=" + kword + " keyword="+ keyWord); if (kword != null && kword.indexOf(keyWord) > -1) { word.set(kword); output.collect(word, one); } } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Properties properties; properties = new Properties(); try { FileInputStream fis = new FileInputStream("keyword.properties"); properties.load(fis); fis.close(); } catch (Exception e) { System.out.println("Error reading keyword.properties"); } String keyWord=properties.getProperty("keyword"); System.out.println("keyword=" + keyWord); JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); conf.set("keyword", keyWord); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } } ================================================================================= run.sh
--------------------------------------------------------------------------------- #if name node is not starting #rm -Rf /app/hadoop/tmp #hadoop namenode -format #hadoop dfs -copyFromLocal /tmp/gutenburg /user/hduser/keyword javac -cp ../hadoop-core-1.2.1.jar -d wordcount_classes WordCount.java jar -cvf wordcount.jar -C wordcount_classes/ . hadoop dfs -rmr /user/hduser/keyword-output hadoop jar wordcount.jar WordCount /user/hduser/keyword /user/hduser/keyword-output rm /tmp/keyword-output hadoop dfs -getmerge /user/hduser/keyword-output /tmp/keyword-output cat /tmp/keyword-output===========================Install Hive, Sqoop==========================================
Sqoop tutorial
http://www.openscg.com/2013/07/hadoop-sqoop-tutorial-hdfs-hive-postgres-easy-data-moves/ http://bnlconsulting.com/index.php/blog/item/93-getting-started-with-hadoop-and-sqoop [this has the hive installation as well] http://jugnu-life.blogspot.com/2012/03/sqoop-installation-tutorial.htmlHive tutorial
Using sqoop upload data from mysql/oracle to Hive Run hive queries in the Map/ReduceHive JDBC
https://cwiki.apache.org/confluence/display/Hive/HiveClient https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-JDBCAfter Hive and Sqoop are installed
$SQOOP_HOME/sqoop list-tables --connect jdbc:mysql://solo.mycom.com:3306/TPS --username tps_svcs --password ts211To create HIVE table space
$HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouseTo Check the Hive Tables:
$HADOOP_HOME/bin/hadoop fs -ls /user/hive/warehouseTo remove HIVE TABLE
$HADOOP_HOME/bin/hadoop fs -rmr /user/hduser/ROC $HADOOP_HOME/bin/hadoop fs -rmr /user/hive/warehouse/ROCTo import a table into hive
NOTE: Don't set HBASE_HOME even if it complains
$SQOOP_HOME/bin/sqoop import --connect jdbc:mysql://solo.mycom.com:3306/TPS --username myuser -password mypassword --table IDENTITY --hive-importWhen there is no primary key use --split-by
$SQOOP_HOME/bin/sqoop import --connect jdbc:mysql://solo.mycom.com:3306/blvd --username myuser -password mypassword --table cdrs --hive-import --split-by uniqueidTo start HIVE server
Note: sometimes sqoop gets stuck with stranger error message like Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database ... metastore_db. restarting the hiveserver apparently fixes the problem
$HIVE_HOME/bin/hive --service hiveserverTo run the TestDB
#!/bin/bash HADOOP_HOME=/usr/local/hadoop-1.2.1 HIVE_HOME=/usr/local/hive-0.11.0-bin echo -e '1\x01foo' > /tmp/a.txt echo -e '2\x01bar' >> /tmp/a.txt HADOOP_CORE=`ls $HADOOP_HOME/hadoop-core-1.2.1.jar` CLASSPATH=.:$HADOOP_CORE:$HIVE_HOME/conf for i in ${HIVE_HOME}/lib/*.jar ; do CLASSPATH=$CLASSPATH:$i done java -cp $CLASSPATH:. TestDBTestDB.java=========================================================================================================================
import java.sql.SQLException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.sql.DriverManager; public class TestDB{ private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver"; public static void main(String[] args) throws SQLException { try { Class.forName(driverName); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); System.exit(1); } Connection con = DriverManager.getConnection("jdbc:hive://localhost:10000/default", "", ""); Statement stmt = con.createStatement(); String tableName="identity"; ResultSet res = stmt.executeQuery("select * from " + tableName); while(res.next()) { System.out.println(res.getString(1)); } } }To download Hive JDBC jar
http://aws.amazon.com/developertools/1982901737448217================================================================================================================================== Data Access Using Hadoop ==================================================================================================================================
import java.io.IOException; import java.io.*; import java.sql.*; import java.util.*; import java.io.FileInputStream; import java.lang.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; import org.apache.hadoop.mapred.lib.db.*; class MyRecord implements Writable, DBWritable { long id; String ani; public String getAni() { return ani; } public void readFields(DataInput in) throws IOException { this.id = in.readLong(); this.ani= Text.readString(in); } public void readFields(ResultSet resultSet) throws SQLException { this.id = resultSet.getLong(1); this.ani = resultSet.getString(2); } public void write(DataOutput out) throws IOException { out.writeLong(this.id); Text.writeString(out, this.ani); } public void write(PreparedStatement stmt) throws SQLException { stmt.setLong(1, this.id); stmt.setString(2, this.ani); } } public class DBWord{ public static class MyMap extends MapReduceBase implements Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private static String keyWord=null; public void configure(JobConf job) { keyWord=job.get("keyword"); System.out.println("in configure keyword=" + keyWord); } @Override public void map(LongWritable key, MyRecord value, OutputCollector output, Reporter reporter) throws IOException { String line = value.getAni(); System.out.println("line=" + line); // System.out.println("kword=" + kword + " keyword="+ keyWord); if (line!= null && line.indexOf(keyWord) > -1) { word.set(line); output.collect(word, one); } } } public static class MyReduce extends MapReduceBase implements Reducer { @Override public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Properties properties; properties = new Properties(); try { FileInputStream fis = new FileInputStream("keyword.properties"); properties.load(fis); fis.close(); } catch (Exception e) { System.out.println("Error reading keyword.properties"); } String keyWord=properties.getProperty("keyword"); System.out.println("keyword=" + keyWord); JobConf conf = new JobConf(DBWord.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MyMap.class); conf.setCombinerClass(MyReduce.class); conf.setReducerClass(MyReduce.class); conf.setInputFormat(DBInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver" , "jdbc:mysql://solo.nts.net/TPS", "tps_svcs", "ts211"); String [] fields = {"roc_id", "ani"}; DBInputFormat.setInput(conf, MyRecord.class, "ROC", null, "roc_id", fields); conf.set("keyword", keyWord); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } } ========================================================== dbrun.sh
---------------------------------------------------------- #if name node is not starting #rm -Rf /app/hadoop/tmp #hadoop namenode -format #hadoop dfs -copyFromLocal /tmp/gutenburg /user/hduser/keyword javac -cp ../hadoop-core-1.2.1.jar:../lib/mysql-connector-java-5.1.14-bin.jar -d dbword_classes DBWord.java jar -cvf dbword.jar -C dbword_classes/ . #can't add mysql jar... #jar uf dbword.jar ../lib/mysql-connector-java-5.1.14-bin.jar hadoop dfs -rmr /user/hduser/keyword-output hadoop jar dbword.jar DBWord /user/hduser/keyword /user/hduser/keyword-output rm /tmp/keyword-output hadoop dfs -getmerge /user/hduser/keyword-output /tmp/keyword-output cat /tmp/keyword-output ==================================================================================================================================== How to handle multiple tables in hadoop? > > > If I get your requirement right you need to get in data from > > > multiple rdbms sources and do a join on the same, also may be some more > > > custom operations on top of this. For this you don't need to go in for > > > writing your custom mapreduce code unless it is that required. You can > > > achieve the same in two easy steps > > > - Import data from RDBMS into Hive using SQOOP (Import) > > > - Use hive to do some join and processing on this data > > > ===================================================================================================================================== http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/ Database Access with Apache Hadoop by Aaron Kimball March 06, 2009 8 comments Editor’s note (added Nov. 9. 2013): Valuable data in an organization is often stored in relational database systems. To access that data, you could use external APIs as detailed in this blog post below, or you could use Apache Sqoop, an open source tool (packaged inside CDH) that allows users to import data from a relational database into Apache Hadoop for further processing. Sqoop can also export those results back to the database for consumption by other clients. Apache Hadoop’s strength is that it enables ad-hoc analysis of unstructured or semi-structured data. Relational databases, by contrast, allow for fast queries of very structured data sources. A point of frustration has been the inability to easily query both of these sources at the same time. The DBInputFormat component provided in Hadoop 0.19 finally allows easy import and export of data between Hadoop and many relational databases, allowing relational data to be more easily incorporated into your data processing pipeline. This blog post explains how the DBInputFormat works and provides an example of using DBInputFormat to import data into HDFS. DBInputFormat and JDBC First we’ll cover how DBInputFormat interacts with databases. DBInputFormat uses JDBC to connect to data sources. Because JDBC is widely implemented, DBInputFormat can work with MySQL, PostgreSQL, and several other database systems. Individual database vendors provide JDBC drivers to allow third-party applications (like Hadoop) to connect to their databases. Links to popular drivers are listed in the resources section at the end of this post. To start using DBInputFormat to connect to your database, you’ll need to download the appropriate database driver from the list in the resources section (see the end of this post), and drop it into the $HADOOP_HOME/lib/ directory on your Hadoop TaskTracker machines, and on the machine where you launch your jobs from. Reading Tables with DBInputFormat The DBInputFormat is an InputFormat class that allows you to read data from a database. An InputFormat is Hadoop’s formalization of a data source; it can mean files formatted in a particular way, data read from a database, etc. DBInputFormat provides a simple method of scanning entire tables from a database, as well as the means to read from arbitrary SQL queries performed against the database. Most queries are supported, subject to a few limitations discussed at the end of this article. Configuring the job To use the DBInputFormat, you’ll need to configure your job. The following example shows how to connect to a MySQL database and load from a table: CREATE TABLE employees ( employee_id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(32) NOT NULL); Listing 1: Example table schema JobConf conf = new JobConf(getConf(), MyDriver.class); conf.setInputFormat(DBInputFormat.class); DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”); String [] fields = { “employee_id”, "name" }; DBInputFormat.setInput(conf, MyRecord.class, “employees”, null /* conditions */, “employee_id”, fields); // set Mapper, etc., and call JobClient.runJob(conf); Listing 2: Java code to set up a MapReduce job with DBInputFormat This example code will connect to mydatabase on localhost and read the two fields from the employees table. The configureDB() and setInput() calls configure the DBInputFormat. The first call specifies the JDBC driver implementation to use and what database to connect to. The second call specifies what data to load from the database. The MyRecord class is the class where data will be read into in Java, and "employees" is the name of the table to read. The "employee_id" parameter specifies the table’s primary key, used for ordering results. The section “Limitations of the InputFormat” below explains why this is necessary. Finally, the fields array lists what columns of the table to read. An overloaded definition of setInput() allows you to specify an arbitrary SQL query to read from, instead. After calling configureDB() and setInput(), you should configure the rest of your job as usual, setting the Mapper and Reducer classes, specifying any other data sources to read from (e.g., datasets in HDFS) and other job-specific parameters. Retrieving the data The DBInputFormat will read from the database, but how does data get to your mapper? The setInput() method used in the example above took, as a parameter, the name of a class which will hold the contents of one row. You’ll need to write an implementation of the DBWritable interface to allow DBInputFormat to populate your class with fields from the table. DBWritable is an adaptor interface that allows data to be read and written using both Hadoop’s internal serialization mechanism, and using JDBC calls. Once the data is read into your custom class, you can then read the class’ fields in the mapper. The following example provides a DBWritable implementation that holds one record from the employees table, as described above: class MyRecord implements Writable, DBWritable { long id; String name; public void readFields(DataInput in) throws IOException { this.id = in.readLong(); this.name = Text.readString(in); } public void readFields(ResultSet resultSet) throws SQLException { this.id = resultSet.getLong(1); this.name = resultSet.getString(2); } public void write(DataOutput out) throws IOException { out.writeLong(this.id); Text.writeString(out, this.name); } public void write(PreparedStatement stmt) throws SQLException { stmt.setLong(1, this.id); stmt.setString(2, this.name); } } Listing 3: DBWritable implementation for records from the employees table A java.sql.ResultSet object represents the data returned from a SQL statement. It contains a cursor representing a single row of the results. This row will contain the fields specified in the setInput() call. In the readFields() method of MyRecord, we read the two fields from the ResultSet. The readFields() and write() methods that operate on java.io.DataInput and DataOutput objects are part of the Writable interface used by Hadoop to marshal data between mappers and reducers, or pack results into SequenceFiles. Using the data in a mapper The mapper then receives an instance of your DBWritable implementation as its input value. The input key is a row id provided by the database; you’ll most likely discard this value. public class MyMapper extends MapReduceBase implements Mapper { public void map(LongWritable key, MyRecord val, OutputCollector output, Reporter reporter) throws IOException { // Use val.id, val.name here output.collect(new LongWritable(val.id), new Text(val.name)); } } Listing 4: Example mapper using a custom DBWritable Writing results back to the database A companion class, DBOutputFormat, will allow you to write results back to a database. When setting up the job, call conf.setOutputFormat(DBOutputFormat.class); and then call DBConfiguration.configureDB() as before. The DBOutputFormat.setOutput() method then defines how the results will be written back to the database. Its three arguments are the JobConf object for the job, a string defining the name of the table to write to, and an array of strings defining the fields of the table to populate. e.g., DBOutputFormat.setOutput(job, "employees", "employee_id", "name");. The same DBWritable implementation that you created earlier will suffice to inject records back into the database. The write(PreparedStatement stmt) method will be invoked on each instance of the DBWritable that you pass to the OutputCollector from the reducer. At the end of reducing, those PreparedStatement objects will be turned into INSERT statements to run against the SQL database. Limitations of the InputFormat JDBC allows applications to generate SQL queries which are executed against the database; the results are then returned to the calling application. Keep in mind that you will be interacting with your database via repeated SQL queries. Therefore: Hadoop may need to execute the same query multiple times. It will need to return the same results each time. So any concurrent updates to your database, etc, should not affect the query being run by your MapReduce job. This can be accomplished by disallowing writes to the table while your MapReduce job runs, restricting your MapReduce’s query via a clause such as “insert_date < yesterday,” or dumping the data to a temporary table in the database before launching your MapReduce process. In order to parallelize the processing of records from the database, Hadoop will execute SQL queries that use ORDER BY, LIMIT, and OFFSET clauses to select ranges out of tables. Your results, therefore, need to be orderable by one or more keys (either PRIMARY, like the one in the example, or UNIQUE). In order to set the number of map tasks, the DBInputFormat needs to know how many records it will read. So if you’re writing an arbitrary SQL query against the database, you will need to provide a second query that returns the number of rows that the first query will return (e.g., by using COUNT and GROUP BY). With these restrictions in mind, there’s still a great deal of flexibility available to you. You can bulk load entire tables into HDFS, or select large ranges of data. For example, if you want to read records from a table that is also being populated by another source concurrently, you might set up that table to attach a timestamp field to each record. Before doing the bulk read, pick the current timestamp, then select all records with timestamps earlier than that one. New records being fed in by the other writer will have later timestamps and will not affect the MapReduce job. Finally, be careful to understand the bottlenecks in your data processing pipeline. Launching a MapReduce job with 100 mappers performing queries against a database server may overload the server or its network connection. In this case, you’ll achieve less parallelism than theoretically possible, due to starvation, disk seeks, and other performance penalties. Limitations of the OutputFormat The DBOutputFormat writes to the database by generating a set of INSERT statements in each reducer. The reducer’s close() method then executes them in a bulk transaction. Performing a large number of these from several reduce tasks concurrently can swamp a database. If you want to export a very large volume of data, you may be better off generating the INSERT statements into a text file, and then using a bulk data import tool provided by your database to do the database import. Conclusions DBInputFormat provides a straightforward interface to read data from a database into your MapReduce applications. You can read database tables into HDFS, import them into Hive, or use them to perform joins in MapReduce jobs. By supporting JDBC, it provides a common interface to a variety of different database sources. This is probably best not used as a primary data access mechanism; queries against database-driven data are most efficiently executed within the database itself, and large-scale data migration is better done using the bulk data export/import tools associated with your database. But when analysis of ad hoc data in HDFS can be improved by the addition of some additional relational data, DBInputFormat allows you to quickly perform the join without a large amount of setup overhead. DBOutputFormat then allows you to export results back to the same database for combining with other database-driven tables. DBInputFormat is available in Hadoop 0.19 and is provided by HADOOP-2536, a patch started by Fredrik Hedberg and further developed by Enis Soztutar. A backport of this patch that can be applied to Hadoop 0.18.3 is available at the above link. This article is based on a talk I gave at the SF Bay Hadoop User Group meetup on Feburary 18th; the slides from that talk are available as a PDF.
No comments:
Post a Comment