Jul 24, 2015

Solr with Spark(s) - Or how to submit a Spark-Task which utilizes a Solr-Cloud from Java-Code


What this tutorial is about

We are going to setup an architecture combining the Big-Data computing framework Apache Spark with a sharded Apache Solr-Cloud. After that we will learn how to start Spark tasks from Java without Spark having any knowledge of the task itself.

For that purpose we will install three virtual machines using VirtualBox.
The following image briefly illustrates our goal.



Technologies used

 For this tutorial we will need to get in touch with the listed technologies.
  • Apache Spark 1.4.0
  • Apache Solr 5.2.1
  • Zookeeper 3.4.6
  • VirtualBox 4.3
  • Ubuntu Server 64bit 15.04 (on each of our VMs)
  • Java 8 (openjdk-8-jdk)
  • Maven



Requirements

In order to finish all the steps with ease you should be capable of the things below.
  • Some Linux Knowledge
  • Little VirtualBox experience
  • Maven
  • Internet connection (surprise!)
  • at least 8GB RAM recommended

Few words before start

This post is about giving you an introduction into the combination of these technologies and should not be seen as fixed solution. The case I was using VirtualBox for this tutorial, is that it is accessible by anyone and does serve well here. If you do not want to use it just keep in mind that some parts of Spark do depend on valid name solving (that's why we will be editing some hosts files later on).

The complete setup process is a big part of this tutorial, but if you are already familar with configuring Spark an Solr you can go straight to Write some Code and skip the setup informations.

Also it is written in a complete tutorial style and should go through every step very detailed. Various issues could occur on each step (and some did so for me). Do not let yourself get deterred if anything does not run right immediately. I tried to point out possible Problems everywhere they appeared for me.

Furthermore, there is a trouble-shooting added to the end of this post mentioning some problems and their solutions.

Setting up our virtual machines

The previous image shows the targeted design, which consists of:

  1. Hostname:   spark-master
      IP:              192.168.56.100
      Running:    Spark - 0 Workers, ZooKeeper

  2. Hostname:   spark1
      IP:              192.168.56.101
      Running:    Spark - 2 Workers, Solr - the first of two shards

  3. Hostname:   spark2
      IP:              192.168.56.102
      Running:    Spark - 2 Workers, Solr - the second of two shards

If you do not want to use that kind of setup, rember that those are the hostnames and IPs which are referenced through out the whole tutorial. For the purpose of simplicity the usernames and passwords are the same on all machines.
Moreover, all downloads are downloaded to the home directory and extracted in same.

1. First VM (spark1)

Create a new VM named spark1 with 2GB RAM in VirtualBox and add a second network-adapter (host-only) to it, as seen below:

So the VM will have two network-adapters, one for the internet access and another for internal  communication.

Install Ubuntu Server 15.04 on that virtual machine. There is not much to be considered here (choose eth0 as installation NIC).

One thing I strongly recommend, is installing OpenSSH server since this will massively improve the configuration process.


1.1. Adding our host only network adapter

After installation we need to give our network-adapter a static IP address, so add the following lines to the end of /etc/network/interfaces:
auto eth1
iface eth1 inet static
address 192.168.56.101
netmask 255.255.255.0
Now we can start the eth1 by sudo ifup eth1 and should be able to ping our host's IP-address (for example: 192.168.56.1).

Side Note:
If you are unable to ping your host, two issues are possible:
 - Host-Only-Network has not yet been fully initialized
    Possible solution: Just wait. The network should be come up shortly.

- Host's firewall is blocking the communication
   
Possible solution: Add an allowance rule to your host's firewall for the 192.168.56.0/24 network.

1.2. (Optional) Using SSH

To make things easier, lets connect to the virtual machine with a SSH tool (such Putty).

1.3. Disabling IPv6

Since IPv6 is not needed in this setup and with Hadoop as part of the Spark installation not being able to cope with it anyway, let's just disable it and eliminate potential troubles (see: HadoopIPv6).To do so, Insert the following lines at the end of /etc/sysctl.conf:
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
and run sudo sysctl -p to reload that configuration file.
You can verify that IPv6 is disabled by running the command cat /proc/sys/net/ipv6/conf/all/disable_ipv6 if it prompts a '1' to the console you have accomplished it.

1.4. Changing the hostname

The machine's hostname should be spark1 for the case of this tutorial, so lets adopt that.
So make the /etc/hosts/ look like this:
127.0.0.1       localhost 
# Same for all three VMs
192.168.56.100 spark-master
192.168.56.101 spark1
192.168.56.102 spark2
Also change the name in /etc/hostname to spark1
You have reached a point where I would recommend taking a snapshot of the machine.

1.5. Software Preparation

Before adding any new programs, update the repositories and the already installed software by typing:
sudo apt-get -y update && sudo apt-get -y upgrade
The Spark task later implemented depends on Java 8, so we need that.
sudo apt-get -y install openjdk-8-jdk
Those steps might take a while.
To add the JAVA_HOME environment variable (temporarily) type:
export JAVA_HOME="/usr"
and add the following line to the end of  /etc/environment (permanently):
JAVA_HOME="/usr"
Now get Solr by finding an appropriate mirror (Apache Solr) and downloading and extracting it.
For instance:
wget http://apache.mirror.iphh.net/lucene/solr/5.2.1/solr-5.2.1.tgz
tar -xvf solr-5.2.1.tgz
Do the same for Spark (Apache Spark)
For instance:
wget http://apache.mirror.digionline.de/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.6.tgz
tar -xvf spark-1.4.0-bin-hadoop2.6.tgz

2. Second VM (spark2)

Now clone spark1 and do the upcoming steps, then reboot that machine:
  • change the MAC addresses for both NICs


  • change the hostname for this VM to spark2 in /etc/hostname
  • change the IP-address to 192.168.56.102 in /etc/nework/interfaces

3. Third VM (spark-master)

Now clone spark1 or spark2 and do the upcoming steps, then reboot that machine:
  • change the MAC addresses for both NICs
  • change the hostname for this VM to spark-master in /etc/hostname
  • change the IP-address to 192.168.56.100 in /etc/nework/interfaces

3.1. Install ZooKeeper

Go to Apache ZooKeeper find a mirror, download and extract it.

Copy the ~/zookeeper-3.4.6/conf/zoo_sample.cfg to ~/zookeeper-3.4.6/conf/zoo.cfg
Change the dataDir in the zoo.cfg to some accessible folder, for instance a newly created myData in the ZooKeeper's folder and then start it.
./zookeeper-3.4.6/bin/zkServer.sh start

4. Changing our host's hosts

Just to make clear, this is your computer not any virtual machine. To ensure that the IP 192.168.56.100 is accessible by the name spark-master add the following line to the hosts:
192.168.56.100 spark-master
In Windows you find the hosts in C:\Windows\System32\Drivers\etc.
I would reckon you know where to find it on Linux.
As I have mentioned in the foreword, this hosts editing is needed due to the missing name solving in a host-only-network.

5. Setting up Spark


5.1. The spark-master

Firstly it is necessary to configure the spark-master. To do so copy the run the following command inside the /spark-1.4.0-bin-hadoop2.6/conf folder:
cp spark-env.sh.template spark-env.sh
The name spark-env.sh is required, since this will be run by Spark!
Edit the spark-env.sh and add those lines to the end:
# The Masters IP: THIS LINE IS ESSENTIAL, as otherwise workers will get a "connection refused"
export SPARK_MASTER_IP="192.168.56.100"
  
# We want to bind spark to our VM-network
export SPARK_LOCAL_IP="192.168.56.100"
 
# We do not want our master to spawn workers
export SPARK_WORKER_INSTANCES=0
Start the master by running the following command inside Spark's folder:
./sbin/start-master.sh
By visiting http://192.168.56.100:8080/ inside your browser you should see the following:


There are no workers yet assigned, this is going to be our next step.

5.2. Workers from spark1

Secondly create the spark-env.sh on spark1 as we just did for the spark-master, and add the  following lines to the end:
# the spark-master ip
export SPARK_MASTER_IP="192.168.56.100"
  
# the ip we want to bind our worker to
export SPARK_LOCAL_IP="192.168.56.101"

#Number of cores per worker-instance
export SPARK_WORKER_CORES=1
  
# Number of worker to spawn on this machine
export SPARK_WORKER_INSTANCES=2
After that start the workers by running:
./sbin/start-slave.sh spark://192.168.56.100:7077
You may be asked to enter the spark-master's password. This can be circumvented by using passwordless SSH certificates, which is not part of the tutorial.

By visiting http://192.168.56.100:8080/ again, you should see the two workers from spark1 now:


Side Note:
On my Windows 8.1 host machine the host-only network seemed a little bit labile. Which made the VMs unable to connect to each others. I have overcome this issue by pinging around.

5.3. Workers from spark2

Last but not least it is spark2's turn. Do the same as you did on spark1 again but keep in mind to change the SPARK_LOCAL_IP.

And then there shall be four workers visible:


5.4. Testing with spark-shell

Let's test if spark is running correctly by doing an example from the Spark's Spark's Quick Start website.

Type this command inside the spark folder on any machine:
./bin/spark-shell --master spark://192.168.56.100:7077
And verify that the spark-shell is a Running Application:

You can play around by doing some calculations from the quick-start guide mentioned.


6. Setting up Solr


6.1. Solr first shard

Log into spark1 go to the Solr-Server folder ~/solr-5.2.1/server/solr and create a new directory called spark-shard1 and copy the solr.xml and the zoo.cfg from the current directory into that new folder.

Now start Solr by running:
~/./solr-5.2.1/bin/solr start -cloud -s solr/spark-shard1 -h 192.168.56.101 -p 8983 -z 192.168.56.100:2181
The '-h <host_ip>' is important here as otherwise Solr may be bound to the 127.0.0.1.

You should now be able to see the Solr admin console at http://192.168.56.101:8983/solr/#/

6.2. Solr second shard

 Do something similar on spark2 (short instructions):
  • Goto the Solr-Server folder: ~/solr-5.2.1/server/solr
  • Create a directory named spark-shard2
  • Copy both configs into that folder
Start Solr:
~/./solr-5.2.1/bin/solr start -cloud -s solr/spark-shard2 -h 192.168.56.102 -p 8983 -z 192.168.56.100:2181
Now another Solr dashboard must be reachable at http://192.168.56.102:8983/solr/#/

6.3. Feeding the ZooKeeper

It is about giving our ZooKeeper a Solr configuration which it then can share among our shards.
To do so copy an example config set (still inside the ~/solr-5.2.1/server/solr folder):
cp -ra configsets/basic_configs/ configsets/spark_configs/
Edit the schema.xml inside the configsets/spark_configs/ and add a title-field to the Solr schema by adding that line below the already defined id-field:
<field name="title" type="string" indexed="true" stored="true" required="true" multiValued="false"/>
Goto the '~/solr-5.2.1/server/scripts/cloud-scripts' folder an upload the config set to the ZooKeeper:
./zkcli.sh -zkhost 192.168.56.100:2181 -cmd upconfig -confname spark_config /
   -confdir ~/solr-5.2.1/server/solr/configsets/spark_configs/conf 
No lets use the ZooKeeper commandline to check if our it is aware of that config.
./zkcli.sh -zkhost 192.168.56.100:2181 -cmd list
Our spark_config should be listed there.

Side note:
If the config upload fails due to something like:
./zkcli.sh: Row 13: unzip: command not found
Install unzip:
sudo apt-get install -y unzip


6.4. Create the Solr-Cloud

Let us get a cloud by using Solr's Collections-API.

curl 'http://192.168.56.102:8983/solr/admin/collections?action=CREATE&name=spark&numShards=2&maxShardsPerNode=1&collection.configName=spark_config'

If you can find <int name="status">0</int> in the response you are good.
This command creates a new sharded collection using the recently uploaded spark_config.

It might happen that you are not happy with the sharding for any case, you can delete the collection by the following command.
curl 'http://192.168.56.102:8983/solr/admin/collections?action=DELETE&name=spark'
 The shards can be viewed in the admin panel:



6.5. Sending data to the cloud

We have our collection without any data now. For indexing we have two possibilities, just index and let Solr decide on which shard to put the documents or we can use our own Document Routing by adding a prefix to the indexed document's id followed by an exclamation mark ('!').

If you are interested in the latter you find a SimpleShardingApp in the Github sources at the end of this post.

Do some indexing either way before proceeding any further.

Side note:
You can query only one specific shard by visiting:

http://192.168.56.101:8983/solr/spark/select?shards=192.168.56.101:8983/solr/spark&q=*:*

Notice the shards=192.168.56.101:8983/solr/spark part where you can change the target shard.


7. Start-Up - Sequence

If you needed to reboot the virtual machines or just want to have the full start sequence at a glance.


7.1. spark-master

./zookeeper-3.4.6/bin/zkServer.sh start
./spark-1.4.0-bin-hadoop2.6/sbin/start-master.sh

7.2. spark1

./solr-5.2.1/bin/solr start -cloud -s solr/spark-shard1 -h 192.168.56.101 -p 8983 -z 192.168.56.100:2181
./spark-1.4.0-bin-hadoop2.6/sbin/start-slaves.sh

7.3. spark2

./solr-5.2.1/bin/solr start -cloud -s solr/spark-shard1 -h 192.168.56.102 -p 8983 -z 192.168.56.100:2181
./spark-1.4.0-bin-hadoop2.6/sbin/start-slaves.sh

Write some Code

All we have done by now is setting up the desired infrastructure. The next three steps are all about coding. We will raise from a very simple task to the kind of task promised in the topic of this post.

1. Building your very first Spark task from Code

All the upcoming steps must be done on the host machine, because our Spark tasks will be submitted from there.

For the connection between Spark and Solr we need a toolkit called spark-solr made by LucidWorks. Download it from Github and extract it.
After that go into that folder and run Maven to install it into your local repository since we will depend on it soon:
mvn clean install
We should now be capable creating our application, so create a new Maven-Project in your favourite IDE and add our freshly created spark-solr as dependency:
<dependency>
    <groupid>com.lucidworks.spark</groupid>
    <artifactid>spark-solr</artifactid>
    <version>1.0-SNAPSHOT</version>
</dependency>
Add a new class including:
public static void main(String[] args) throws Exception {
 SparkConf conf = new SparkConf()
    .setAppName("Higher Math")
    .setMaster("spark://192.168.56.100:7077")
    .set("spark.driver.host", "192.168.56.1");

 JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

 List calculationCounts = Arrays.asList("one", "two", "three");

 JavaRDD logData = javaSparkContext.parallelize(calculationCounts);

 System.out.println("Counting to... " + logData.count());

 javaSparkContext.stop();
}
If you run this you should see a fully console and somewhere Counting to... 3. In the Spark master panel you would have seen the task we have just run:



Nice we just created our first Spark Task from Java Code! We counted the elements in the calculationCounts list on two different machines!

Side Note:
There may be an IOException (Could not locate executable null\bin\winutils.exe in the Hadoop binaries) shown in the console, which does not bother us for the sake of this tutorial. It has something to do with Hadoop not being installed on the host and a missing environment variable pointing to that directory.


2. Adding customized functionality to our Spark task

For better customization we want to be able to submit tasks which are not just counting but using anonymous classes or lambdas for more advanced calculations.

Due that purpose we are using the following code to solve a well known calculation:
public static void main(String[] args) throws Exception {
 SparkConf conf = ContextMaker.makeSparkConfig("Counting Factorials");

 JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

 List calculationCounts = Arrays.asList(5000, 1000, 5000, 2000);

 JavaRDD logData = javaSparkContext.parallelize(calculationCounts);

 // Using lambda here...
 logData.foreach(nrOfCalculations -> {
  LongStream.rangeClosed(1, nrOfCalculations)
    .mapToObj(BigInteger::valueOf)
    .forEach(AdvancedSparkJobApp::calculateFactorial);
 });

 System.out.printf("Calculated '%s' factorials %n",
  calculationCounts.stream().reduce(0, (x, y) -> x + y));
 
 javaSparkContext.stop();
}

private static BigInteger calculateFactorial(BigInteger n) {
 return LongStream.rangeClosed(2, n.longValue())
   //.parallel() // can be used to increase performance
   .mapToObj(BigInteger::valueOf)
   .reduce(BigInteger.valueOf(1), BigInteger::multiply);
}
This code shall solve 13000 thousand factorials split into four partitions. The calculationCounts defines the number of factorials which should be calculated. So that the first worker calculates the factorials from 2 to 5000, the second from 2 to 1000 and so on.

If you run this right now, the task will fail due to:
"java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda..."

Important: Which means the workers do not know the lambda we are using in the foreach call. To overcome this issue, we must package our Maven project and add the created Jar to the JavaSparkContext. In my case:
javaSparkContext.addJar("target/blog-test-1.0-SNAPSHOT.jar");
Now by running the main() we will notice Calculated '13000' factorials in the console output.


3. Use Solr in a Spark task

Finally we will be doing what I have promised in the foreword, using data from Solr. The following code will do that for us. It queries Solr for all documents with a non-empty title field.
public static void main(String[] args) throws Exception {
 String zkHost = "192.168.56.100:2181";
 String collection = "spark";
 String queryStr = "*:*";

 SparkConf conf = ContextMaker.makeSparkConfig("Counting Factorials");

 JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

 // May differ for you
 javaSparkContext.addJar("target/blog-test-1.0-SNAPSHOT.jar");

 SolrRDD solrRDD = new SolrRDD(zkHost, collection);
 final SolrQuery solrQuery = SolrRDD.toQuery(queryStr);
 JavaRDD<solrdocument> solrJavaRDD = solrRDD.query(javaSparkContext.sc(), solrQuery);

 JavaRDD<string> titleNumbers = solrJavaRDD.flatMap(doc -> {
  Object possibleTitle = doc.get("title");
  String title = possibleTitle != null ? possibleTitle.toString() : "";
  return Arrays.asList(title);
 }).filter(s -> !s.isEmpty());

 System.out.println("\n# of found titles: " + titleNumbers.count());

 javaSparkContext.stop();
}

Important: Wait a moment, this code wont work either right away! We need to add some more jars to the JavaSparkContext.
In detail, those jars are:
You can find the latter three in an online Maven repository or build them yourself. There may be more recently versions for httpmime and noggit, but the ones mentioned are at least required.

Create a folder in you project and put all needed jars into that and add them to the JavaSparkContext:
javaSparkContext.addJar("libs_to_transfer/spark-solr-1.0-SNAPSHOT.jar");
javaSparkContext.addJar("libs_to_transfer/solr-solrj-5.2.1.jar");
javaSparkContext.addJar("libs_to_transfer/noggit-0.6.jar");
javaSparkContext.addJar("libs_to_transfer/httpmime-4.4.1.jar");
By running the program now, there should be something like # of titles found: 66666 being visible, as every document does have a title (if you have used the SimpleShardingApp).

That's it, we gone a long way making Spark accept a task including custom functionality which queries a Solr-Cloud.


Side Note:
We can even do some SQL-Like querying on that SolrRDD (this is based on the spark-solr example called SolrQueryProcessor.java):
// Now use schema information in Solr to build a queryable SchemaRDD
SQLContext sqlContext = new SQLContext(javaSparkContext);

// Pro Tip: SolrRDD will figure out the schema if you don't supply a list of field names in your query
DataFrame tweets = solrRDD.asTempTable(sqlContext, queryStr, "documents");

// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT * FROM documents where id LIKE 'one%'");

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
JavaRDD<row> resultsRDD = results.javaRDD();

System.out.println("\n\n# of documents where 'id' starts with 'one': " + resultsRDD.count());
That snipped will count all documents where the id begins with 'one'.


Trouble-Shooting

As promised, I have listed some issues and possible solutions for them. All of them had come up for me.
  • If you can not ping/or connect to the VMs maybe your firewall is blocking this kind of traffic. Add an allowance rule to your host's firewall for the 192.168.56.0/24 network.
  • If you run a Spark application on the host and the application is unable to connect to the spark-master. Maybe you the hostname name solving is not working properly, take a look in the hosts for the correct hostnames
  • I have had big trouble with the my Windows 8.1 VirtualBox host-only network, as it seemed to collapse from time to time. Ping to see if the VM is reachable if not maybe a reboot (VM) will help.
  • If the Spark task is visible on the website but the application seems to be in an loop. Check the logs on the spark-master in the logs folder it can be a version difference from your host running the app and the spark-master

  • If you tried to create the collection for Solr but get an error 400 saying that there are too few nodes. One Solr shard may not be reachable or down.

Final words

This tutorial should have illustrated the long way from nothing to a Spark/Solr-Cloud combination and have shown some additional information which is essential for understanding the components a little bit better.

The architecture and the examples were oversimplified for the purpose of pointing out the interaction between those technologies and providing easy access to the whole topic.



Sources

No comments:

Post a Comment