Full source in Github repo

Develop environment

  • NetBeans IDE 8.2
  • Java with Maven Project

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>longtrieuuit</groupId>
    <artifactId>sparkexample</artifactId>
    <version>tutorial</version>
    <packaging>jar</packaging>
    
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>provided</scope>
        </dependency>
     
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
      
      
        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10 -->
        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>2.0.6</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.0.0</version>
            <scope>provided</scope>
        </dependency>
       
      
      
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.6.2</version>
            <type>jar</type>
        </dependency>
        
        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

    </dependencies>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
        <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Interacting with Cassandra from Spark sample source

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package longtrieuuit.sparkexample;

import com.datastax.spark.connector.japi.CassandraJavaUtil;
import java.sql.SQLException;
import java.text.ParseException;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

/**
 *
 * @author trieu
 */
public class Connect_SparkWithCassandra {

    private static final String CASSANDRA_DB_USER = "remoteuser";
    private static final String CASSANDRA_USER_PASS = "guest35!";
    private static final String TABLE_NAME = "d_registered_com_10000005";
    private static final String KEY_SPACES = "testdb";
    private static final String CASSANDRA_HOSTS = "192.168.130.63,192.168.130.64,192.168.130.65,192.168.130.66,192.168.130.67,192.168.130.68,192.168.130.69,192.168.130.70,192.168.130.71,192.168.130.72,192.168.130.73,192.168.130.74";

    public static void main(String[] args) throws SQLException, ClassNotFoundException, ParseException {
        SparkConf sparkConf = new SparkConf(true);
        // sparkConf.setMaster("spark://192.168.1.212:7077");
        sparkConf.setAppName("How to connect Spark with Cassandra App");
        sparkConf.set("spark.cassandra.connection.host", CASSANDRA_HOSTS);
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.set("spark.cassandra.input.split.size_in_mb", "50");
        sparkConf.set("spark.broadcast.blockSize", "500M");
        sparkConf.set("spark.executor.cores", "1");
        sparkConf.set("spark.rpc.message.maxSize", "400");
        sparkConf.set("spark.driver.maxResultSize", "55g");
        sparkConf.set("spark.kryoserializer.buffer.max.mb", "500");
        sparkConf.set("spark.cassandra.auth.username", CASSANDRA_DB_USER);
        sparkConf.set("spark.cassandra.auth.password", CASSANDRA_USER_PASS);
        sparkConf.set("spark.cassandra.input.consistency.level", "TWO");
        sparkConf.set("spark.shuffle.io.maxRetries", "20");
        sparkConf.set("spark.rpc.numRetries", "30");
        sparkConf.set("spark.task.maxFailures", "30");
   //     sparkConf.set("spark.cassandra.output.consistency.level", "EACH_QUORUM");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        JavaRDD<com.datastax.spark.connector.japi.CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(KEY_SPACES, TABLE_NAME).select("yyyymmdd,id".split(","));
        
    // Count CassandraRow
        System.out.println("CassandraRow count: " + cassandraRDD.count());
    }

}



start the shell with this command

/home/ca/spark-2.1.0-bin-hadoop2.7/bin/spark-submit  --class longtrieuuit.sparkexample.Connect_SparkWithCassandra  --master spark://192.168.128.93:7077,192.168.128.94:7077 --driver-memory 1g --total-executor-cores 5  --executor-memory 1G --executor-cores 1 --driver-java-options "-Dspark.akka.frameSize=15" /home/ca/javafile/sparkexample-tutorial-jar-with-dependencies.jar