Hey there!
Welcome to ClearUrDoubt.com.
In this post, we will look at a Spark program to load a table data from Cassandra to Hive using Java.
Step 1: Create a table in Cassandra and insert records into it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
cqlsh:clearurdoubt> CREATE TABLE students( ... id int PRIMARY KEY, ... name text, ... marks int); cqlsh:clearurdoubt> INSERT INTO students (id, name, marks) VALUES (101, 'Sidhu', 92); cqlsh:clearurdoubt> INSERT INTO students (id, name, marks) VALUES (102, 'Sanju', 90); cqlsh:clearurdoubt> INSERT INTO students (id, name, marks) VALUES (103, 'Ram', 45) ; cqlsh:clearurdoubt> INSERT INTO students (id, name, marks) VALUES (105, 'Krishna', 88) ; cqlsh:clearurdoubt> INSERT INTO students (id, name, marks) VALUES (104, 'Mounika', 90) ; cqlsh:clearurdoubt> cqlsh:clearurdoubt> select * from students; id | marks | name -----+-------+--------- 105 | 88 | Krishna 104 | 90 | Mounika 102 | 90 | Sanju 101 | 92 | Sidhu 103 | 45 | Ram (5 rows) cqlsh:clearurdoubt> |
Step 2: Write a Spark program to connect to Cassandra using Java.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package com.clearurdoubt; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class SparkCassandraDemo { public static void main(String[] args) { // Create a SparkConf object by setting the Cassandra connection string SparkConf conf = new SparkConf().setAppName("Sparkk Cassandra Demo"); conf.set("spark.cassandra.connection.host", "127.0.01"); // Create/Get a SparkSession object SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); // Specify the format as spark.sql.cassandra and provide table & keyspace details while reading the cassandra table Dataset<Row> studentsDS = spark.read() .format("org.apache.spark.sql.cassandra") .option("table", "students") .option("keyspace", "clearurdoubt") .load(); studentsDS.show(); // Write the data to HDFS. studentsDS.write().mode("overwrite").format("parquet").save(args[0]); System.out.println("Successfully read data from Cassandra."); } } |
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.clearurdoubt</groupId> <artifactId>SparkCassandraDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.9.1</version> </dependency> <dependency> <groupId>com.datastax.spark</groupId> <artifactId>spark-cassandra-connector_2.11</artifactId> <version>1.3.0</version> </dependency> </dependencies> <properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> </properties> <build> <finalName>spark_casssandra_demo</finalName> </build> </project> |
spark-submit command:
1 2 3 4 5 |
clearurdoubt@clearurdoubt:~$ spark-submit --class com.clearurdoubt.SparkCassandraDemo --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 --master local /home/clearurdoubt/workspace/SparkCassandraDemo/target/spark_casssandra_demo.jar hdfs://localhost:9000/hdfs_home/clearurdoubt/datafiles/clearurdoubt/data/students |
Output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
... ... 2018-10-29 00:03:57 INFO DAGScheduler:54 - Job 1 finished: show at SparkCassandraDemo.java:24, took 0.404808 s +---+-----+-------+ | id|marks| name| +---+-----+-------+ |105| 88|Krishna| |104| 90|Mounika| |102| 90| Sanju| |101| 92| Sidhu| |103| 45| Ram| +---+-----+-------+ 2018-10-29 00:03:58 INFO CassandraSourceRelation:35 - Input Predicates: [] ... ... 2018-10-29 00:03:59 INFO ParquetWriteSupport:54 - Initialized Parquet WriteSupport with Catalyst schema: { "type" : "struct", "fields" : [ { "name" : "id", "type" : "integer", "nullable" : true, "metadata" : { } }, { "name" : "marks", "type" : "integer", "nullable" : true, "metadata" : { } }, { "name" : "name", "type" : "string", "nullable" : true, "metadata" : { } } ] } and corresponding Parquet message type: message spark_schema { optional int32 id; optional int32 marks; optional binary name (UTF8); } 2018-10-29 00:04:00 INFO CodecPool:153 - Got brand-new compressor [.snappy] ... ... 2018-10-29 00:04:06 INFO InternalParquetRecordWriter:160 - Flushing mem columnStore to file. allocated memory: 27 2018-10-29 00:04:06 INFO FileOutputCommitter:535 - Saved output of task 'attempt_20181029000406_0002_m_000003_0' to hdfs://localhost:9000/hdfs_home/clearurdoubt/datafiles/clearurdoubt/data/students/_temporary/0/task_20181029000406_0002_m_000003 2018-10-29 00:04:06 INFO SparkHadoopMapRedUtil:54 - attempt_20181029000406_0002_m_000003_0: Committed 2018-10-29 00:04:06 INFO Executor:54 - Finished task 3.0 in stage 2.0 (TID 7). 2176 bytes result sent to driver ... ... 2018-10-29 00:04:07 INFO FileFormatWriter:54 - Finished processing stats for job null. Successfully read data from Cassandra. 2018-10-29 00:04:14 INFO CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster 2018-10-29 00:04:15 INFO SparkContext:54 - Invoking stop() from shutdown hook 2018-10-29 00:04:15 INFO SerialShutdownHooks:35 - Successfully executed shutdown hook: Clearing session cache for C* connector 2018-10-29 00:04:15 INFO AbstractConnector:318 - Stopped Spark@775d9a9e{HTTP/1.1,[http/1.1]}{0.0.0.0:4044} 2018-10-29 00:04:15 INFO SparkUI:54 - Stopped Spark web UI at http://192.168.1.30:4044 2018-10-29 00:04:15 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped! 2018-10-29 00:04:15 INFO MemoryStore:54 - MemoryStore cleared 2018-10-29 00:04:15 INFO BlockManager:54 - BlockManager stopped 2018-10-29 00:04:15 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2018-10-29 00:04:15 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! 2018-10-29 00:04:15 INFO SparkContext:54 - Successfully stopped SparkContext 2018-10-29 00:04:15 INFO ShutdownHookManager:54 - Shutdown hook called 2018-10-29 00:04:15 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-8bc51582-086c-4b61-a60e-6845ebc9e058 2018-10-29 00:04:15 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-57057cd7-75a6-431d-8e36-b6a179898a80 clearurdoubt@clearurdoubt:~$ |
Step 3: Let’s create a HIVE table on the output location(i.e., hdfs://localhost:9000/hdfs_home/clearurdoubt/datafiles/clearurdoubt/data/students) and check the imported data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
clearurdoubt@clearurdoubt:~$ hive SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/clearurdoubt/hive-2.3.3/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/clearurdoubt/hadoop-2.9.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Logging initialized using configuration in jar:file:/home/clearurdoubt/hive-2.3.3/lib/hive-common-2.3.3.jar!/hive-log4j2.properties Async: true Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. hive> hive> hive> use clearurdoubt; OK hive> hive> Time taken: 5.282 seconds hive> CREATE EXTERNAL TABLE students > ( > id int, > name string, > marks int > ) > STORED AS PARQUET > LOCATION 'hdfs://localhost:9000/hdfs_home/clearurdoubt/datafiles/clearurdoubt/data/students'; OK hive> hive> Time taken: 0.798 seconds hive> > > > select * from students; OK 105 Krishna 88 104 Mounika 90 102 Sanju 90 101 Sidhu 92 103 Ram 45 Time taken: 1.679 seconds, Fetched: 5 row(s) hive> |
The 5 records have been imported from Cassandra successfully.
Happy learning :).
Please leave a reply in case of any queries.