Wednesday, April 6, 2016

Quick Start Guide to Run YCSB on MongoDB

Quick Start Guide to Run YCSB on MongoDB

Note: The below process was tested on Standalone MongoDB instance only.
1.     Install and Start MongoDB
Please refer MongoDB Standalone Installation best practice and reference guide

2.     Install Java
Bydefault on RHEL6 java version "1.7.0_65" / OpenJDK Runtime Environment (rhel-2.5.1.2.el6_5-x86_64 u65-b17) is installed.
You can use the same or can also install latest Java version from Oracle, YCSB works on both.

rpm –Uvh jdk-8u73-linux-x64.rpm

or install using yum
yum install java-devel

update JAVA_HOME and PATH variables in the .bash_profile of root or any user you login with.

3.     Install Maven
wget http://ftp.heanet.ie/mirrors/www.apache.org/dist/maven/maven-3/3.1.1/binaries/apache-maven-3.1.1-bin.tar.gz
sudo tar xzf apache-maven-*-bin.tar.gz -C /usr/local
cd /usr/local
sudo ln -s apache-maven-* maven
sudo vi /etc/profile.d/maven.sh
Add the following to maven.sh
export M2_HOME=/usr/local/maven
export PATH=${M2_HOME}/bin:${PATH}
Reload bash and test mvn
bash
mvn –version

4.     Install YCSB and Setup
Download the YCSB zip file and compile:
curl -O --location https://github.com/brianfrankcooper/YCSB/releases/download/0.5.0/ycsb-0.5.0.tar.gz
tar xfvz ycsb-0.5.0.tar.gz
cd ycsb-0.5.0

Try below commands if it runs successfully then you’re good to proceed if not then necessary python argparse package have to be installed. (RHEL 6 deprecated this this package)

$ ./bin/ycsb shell basic
> help
Commands:
  read key [field1 field2 ...] - Read a record
  scan key recordcount [field1 field2 ...] - Scan starting at key
  insert key name1=value1 [name2=value2 ...] - Insert a new record
  update key name1=value1 [name2=value2 ...] - Update a record
  delete key - Delete a record
  table [tablename] - Get or [set] the name of the table
  quit - Quit


Run YCSB
Now you are ready to run! First, use the asynchronous driver to load the data:
./bin/ycsb load mongodb-async -s -P workloads/workloada > outputLoad.txt




[root@localhost YCSB]# more outputLoad.txt
YCSB Client 0.1
Command line: -db com.yahoo.ycsb.db.AsyncMongoDbClient -s -P workloads/workloada -load
mongo connection created with mongodb://localhost:27017/ycsb?w=1
14:46:49.257 [Thread-1] DEBUG c.a.m.c.c.b.BootstrapConnectionFactory - Simple MongoDB bootstrap to localhost/127.0.0.1:27017.
14:46:50.348 [Thread-1] DEBUG c.a.mongodb.client.ClientImpl - MongoDB Connection closed: MongoDB(49021-->localhost/127.0.0.1:2701
7)
[OVERALL], RunTime(ms), 1521.0
[OVERALL], Throughput(ops/sec), 657.4621959237344
[CLEANUP], Operations, 1.0
[CLEANUP], AverageLatency(us), 2207.0
[CLEANUP], MinLatency(us), 2206.0
[CLEANUP], MaxLatency(us), 2207.0
[CLEANUP], 95thPercentileLatency(us), 2207.0
[CLEANUP], 99thPercentileLatency(us), 2207.0
[INSERT], Operations, 1000.0
[INSERT], AverageLatency(us), 1113.879
[INSERT], MinLatency(us), 416.0
[INSERT], MaxLatency(us), 382207.0
[INSERT], 95thPercentileLatency(us), 1451.0
[INSERT], 99thPercentileLatency(us), 2131.0
[INSERT], Return=OK, 1000


In MongoDB
[root@localhost ~]# mongo
MongoDB shell version: 2.6.11
connecting to: test
> show dbs
admin  (empty)
local  0.078GB
ycsb   0.078GB
> use ycsb
switched to db ycsb
> show collections
system.indexes
usertable
> db.usertable.find().limit(1).pretty()
{
        "_id" : "user6284781860667377211",
        "field5" : BinData(0,"PE85LUUrJjF2Jzh+PSwsOSBwN0Z/Ljk6OEo1OFd5PzAuKC18J1tpJ0AhOlRpOkIhLzg0NSouLz1+K14lMlYrNSRyJjF+MlhtKyUsIDRoL0RrOyMuJFBjO0g/LzByLiM+OSlsNw=="),
        "field4" : BinData(0,"JipgKiFoIkcxLVBrLUM7NFQhLEotICBgPzZqJjcqMzVwN0w/Ky16JyMiJjooPVBtPD40MDhkIzA0KE9nPkRxMzp0Ol4jMFAlKiN+PlB3KVY3JCcwOV4rNTt8LDZ6NiVgNU5jMg=="),
        "field3" : BinData(0,"LDZ+OkEvPkFxLj1wOz8sNTd2PUkrKF01IUJxL0w9OEZjPyFuIEV/P15pKkxtJjlqOTUkMEZ1KD8+OS0iOCYuNT5gPSQmKFo5OVFjP1tjPUknPC96P0opJCl8J15xJ0IjKkVlPg=="),
        "field2" : BinData(0,"KC4mNT4uPUxxIy9yPjt8LldvMytkJkU3LyN4NjV8J09vIlJ3LTJkOzAqJDsqJVAxISEiOER/Pk9jMjkyLDh4J1s9NUZ7MkNxKllvL10vOl57Mzs8MVI3PTAwLjAyIUozJz52Kg=="),
        "field9" : BinData(0,"LVU3KUozJjkyJFktKS9qIlY3PS94Nlp/IjEiPC1gOzd6IyIqNkU1MC8oJjUkPCRoJS9uKiEyPiY8OFdhPkp7Ojw4K141LTYkJ0s5JDEoKF8tJDAwKiY2MVojOTFoKy4uIj9gOw=="),
        "field8" : BinData(0,"Okc7OFhhJVwtMSs2LkIjJFtnOyQ2KVpxKV0nIVJ/P0w1IUMpM1B1NlcrJltvKjkuNlN7L0krNEZtPkE3L1RjJ1gxL0svODgkIjIwIEFrLkclOV9vOlJ/Ji4+NjF2JzZmIEhzIg=="),
        "field7" : BinData(0,"J1MnJUE3MFspNCg+O0YhKVl9LF0vLCswMjpkL1QpJ1J7J0F7OyxiM0JrOzs6MDRsOjNqM1cxIiEwJVxhOjw2MVRjMzIwLFxnKyY+JElhJzVyI0plIldtLlwnPDN6KlV9MEdjOg=="),
        "field6" : BinData(0,"LyUmKVctK1V3OyRmOCx2K0VtPSloPypsPSIqNUsvPD5+LkIvMDZqLlN9L14hKCJ4MVNzOzB2MV53O0EhMEIpIUl9MkctMDo8PTY+JUtpP1orNUI7NF9nPyp0Ij4iJVYhOTQsNA=="),
        "field1" : BinData(0,"NDBuMkVlIzpyLl0lNCh0Ny9sMzV0J1hhIy4gMEV1OzR8LyosNiQ6NTEgKSU8PEx3P1EtOSE0Lj1yJk1vNCEwOV99Oj96Iy1+JStqKy42LDk8LCpwPU1jKjF+OkdhI14lP1p/Pw=="),
        "field0" : BinData(0,"IS5kM1RvJjYwNTQ8JkcjMEVzJ1knNC4qIkI3IFInLjZ2OlJ3NzkgPUozKlopPDE4Ijl4K0srIl05PCpyOkEtPVwrLjc6MCwuKkl5IkUpIyBgOit8IEcrMS0oNjguNEBzKStmMg==")
}
> db.usertable.count()
1000
> 


Then, run the workload:
./bin/ycsb run mongodb-async -s -P workloads/workloada > outputRun.txt




[root@localhost YCSB]# more outputRun.txt
YCSB Client 0.1
Command line: -db com.yahoo.ycsb.db.AsyncMongoDbClient -s -P workloads/workloada -t
mongo connection created with mongodb://localhost:27017/ycsb?w=1
15:34:18.338 [Thread-1] DEBUG c.a.m.c.c.b.BootstrapConnectionFactory - Simple MongoDB bootstrap to localhost/127.0.0.1:27017.
15:34:19.021 [Thread-1] DEBUG c.a.mongodb.client.ClientImpl - MongoDB Connection closed: MongoDB(49027-->localhost/127.0.0.1:2701
7)
[OVERALL], RunTime(ms), 848.0
[OVERALL], Throughput(ops/sec), 1179.245283018868
[CLEANUP], Operations, 1.0
[CLEANUP], AverageLatency(us), 2020.0
[CLEANUP], MinLatency(us), 2020.0
[CLEANUP], MaxLatency(us), 2020.0
[CLEANUP], 95thPercentileLatency(us), 2020.0
[CLEANUP], 99thPercentileLatency(us), 2020.0
[READ], Operations, 520.0
[READ], AverageLatency(us), 580.0365384615385
[READ], MinLatency(us), 237.0
[READ], MaxLatency(us), 71359.0
[READ], 95thPercentileLatency(us), 669.0
[READ], 99thPercentileLatency(us), 853.0
[READ], Return=OK, 520
[UPDATE], Operations, 480.0
[UPDATE], AverageLatency(us), 755.9708333333333
[UPDATE], MinLatency(us), 397.0
[UPDATE], MaxLatency(us), 10263.0
[UPDATE], 95thPercentileLatency(us), 964.0
[UPDATE], 99thPercentileLatency(us), 4767.0
[UPDATE], Return=OK, 480


TRAP::
Everytime drop the Collection and rerun the benchmark as data already inserted, load step will throw      Duplicate exception.
[root@localhost YCSB]# ./bin/ycsb load mongodb -s -P workloads/workloada > outputLoad.txt
java -cp /root/YCSB/mongodb-binding/conf:/root/YCSB/conf:/root/YCSB/lib/jackson-mapper-asl-1.9.4.jar:/root/YCSB/lib/HdrHistogram-2.1.4.jar:/root/YCSB/lib/core-0.7.0.jar:/root/YCSB/lib/jackson-core-asl-1.9.4.jar:/root/YCSB/mongodb-binding/lib/logback-classic-1.1.2.jar:/root/YCSB/mongodb-binding/lib/mongodb-binding-0.7.0.jar:/root/YCSB/mongodb-binding/lib/mongodb-async-driver-2.0.1.jar:/root/YCSB/mongodb-binding/lib/slf4j-api-1.6.4.jar:/root/YCSB/mongodb-binding/lib/mongo-java-driver-3.0.3.jar:/root/YCSB/mongodb-binding/lib/logback-core-1.1.2.jar com.yahoo.ycsb.Client -db com.yahoo.ycsb.db.MongoDbClient -s -P workloads/workloada -load
Loading workload...
Starting test.
2016-03-10 15:42:49:871 0 sec: 0 operations; est completion in 0 seconds
DBWrapper: report latency for each error is false and specific error codes to track for latency are: []
Exception while trying bulk insert with 0
com.mongodb.MongoWriteException: insertDocument :: caused by :: 11000 E11000 duplicate key error index: ycsb.usertable.$_id_  dup key: { : "user6284781860667377211" }
        at com.mongodb.MongoCollectionImpl.executeSingleWriteRequest(MongoCollectionImpl.java:487)
        at com.mongodb.MongoCollectionImpl.insertOne(MongoCollectionImpl.java:277)
        at com.yahoo.ycsb.db.MongoDbClient.insert(MongoDbClient.java:270)
        at com.yahoo.ycsb.DBWrapper.insert(DBWrapper.java:208)
        at com.yahoo.ycsb.workloads.CoreWorkload.doInsert(CoreWorkload.java:579)
        at com.yahoo.ycsb.ClientThread.run(Client.java:346)
Error inserting, not retrying any more. number of attempts: 1Insertion Retry Limit: 0
2016-03-10 15:42:50:674 0 sec: 0 operations; est completion in 106751991167300 days 15 hours [CLEANUP: Count=1, Max=2087, Min=2086, Avg=2087, 90=2087, 99=2087, 99.9=2087, 99.99=2087] [INSERT-FAILED: Count=1, Max=78143, Min=78080, Avg=78112, 90=78143, 99=78143, 99.9=78143, 99.99=78143] [INSERT: Count=0, Max=0, Min=9223372036854775807, Avg=�, 90=0, 99=0, 99.9=0, 99.99=0]

Dropping usertable collection
> db.usertable.drop()
true
> 


Similarly, to use the synchronous driver from MongoDB Inc. we load the data:
./bin/ycsb load mongodb -s -P workloads/workloada > outputLoad.txt




outputLoad.txt
[OVERALL], RunTime(ms), 1735.0
[OVERALL], Throughput(ops/sec), 576.3688760806916
[CLEANUP], Operations, 1.0
[CLEANUP], AverageLatency(us), 2169.0
[CLEANUP], MinLatency(us), 2168.0
[CLEANUP], MaxLatency(us), 2169.0
[CLEANUP], 95thPercentileLatency(us), 2169.0
[CLEANUP], 99thPercentileLatency(us), 2169.0
[INSERT], Operations, 1000.0
[INSERT], AverageLatency(us), 1305.7
[INSERT], MinLatency(us), 692.0
[INSERT], MaxLatency(us), 67263.0
[INSERT], 95thPercentileLatency(us), 2181.0
[INSERT], 99thPercentileLatency(us), 5415.0
[INSERT], Return=OK, 1000

In The MongoDB
> show collections
system.indexes
usertable
> db.usertable.count()
1000
> 


Then, run the workload:
./bin/ycsb run mongodb -s -P workloads/workloada > outputRun.txt



outputRun.txt
[OVERALL], RunTime(ms), 1776.0
[OVERALL], Throughput(ops/sec), 563.063063063063
[CLEANUP], Operations, 1.0
[CLEANUP], AverageLatency(us), 2067.0
[CLEANUP], MinLatency(us), 2066.0
[CLEANUP], MaxLatency(us), 2067.0
[CLEANUP], 95thPercentileLatency(us), 2067.0
[CLEANUP], 99thPercentileLatency(us), 2067.0
[READ], Operations, 504.0
[READ], AverageLatency(us), 1201.1170634920634
[READ], MinLatency(us), 767.0
[READ], MaxLatency(us), 16199.0
[READ], 95thPercentileLatency(us), 1751.0
[READ], 99thPercentileLatency(us), 1929.0
[READ], Return=OK, 504
[UPDATE], Operations, 496.0
[UPDATE], AverageLatency(us), 1509.0645161290322
[UPDATE], MinLatency(us), 839.0
[UPDATE], MaxLatency(us), 79679.0
[UPDATE], 95thPercentileLatency(us), 1995.0
[UPDATE], 99thPercentileLatency(us), 3913.0
[UPDATE], Return=OK, 496

Diff between Synchronize and Asynchronize driver usage
While the usability of the driver is critical, its primary reason for existing is to enable maximum performance from a MongoDB server. A series of benchmarks have been created to measure the performance of the Asynchronous driver relative to the MongoDB Inc. supported (legacy) driver.
YCSB (Yahoo! Cloud Server Benchmark) provides a standard set of workloads to try and compare the performance of various data stores. Instead of benchmarking different data stores we have used the benchmark to compare the relative performance of the legacy MongoDB Java Driver and the MongoDB Asynchronous Java Driver. The YCSB results show MongoDB Asynchronous Java Driver has lower latencylower variability in latency and higher throughput across all of the benchmark scenarios. In addition, this driver has a much lower slope for increasing latency as contention for the available connections increases

Running Workload  < To be updated >
6 things to keep in mind
·         Set up the database system to test
·         Choose the appropriate DB interface layer
·         Choose the appropriate workload
·         Choose the appropriate runtime parameters (number of client threads, target throughput, etc.)
·         Load the data
·         Execute the workload

If followed above steps as it is and have installed all the apps using root then








An Introduction to Pig

An Introduction to Pig
Pig is an open source high-level data flow system. Apache Pig allows us to write complex MapReduce transformations using a simple scripting language (e.g. 10 lines of Pig script is equal to 200 lines of Java). This language is called Pig Latin. Pig Latin defines a set of transformations on a data set such as aggregate, join and sort. The statement written in Pig script is then translated by Pig into MapReduce so that it can be executed within Hadoop. Pig Latin may be extended using User Defined Functions (UDFs), which the user can write in Java, Python etc. or a scripting language and then call directly from the Pig Latin.
What Pig Does?
Pig was mainly designed for performing a long set of data operations, making it ideal for three categories of Big Data operations:
·         Standard extract-transform-load (ETL) data pipelines,
·         Research on raw data, and
·         Iterative processing of data.
Pig Latin has the following key properties:
·         Easy to Program: Complex tasks involving interrelated data transformations can be simplified and encoded as data flow sequences. Pig programs can be used to do huge tasks, but they are easy to write and maintain.
·         Optimization opportunities: The encoded tasks have the ability to optimize their execution automatically which allows the user to concentrate on semantics rather than efficiency.
·         Extensible: Pig users can create their own functions to meet their special-purpose processing.
Why to use Pig when MapReduce already exists?
From the figure below, it is clear that Pig takes 1/20th lines of code as compared to MapReduce program. Also, the development time for Pig is about 1/16th as compared to that of MapReduce program.
Advantages of Pig include:
·         Pig is much higher level declarative language than MapReduce which increases programmer productivity, decreases duplication of effort and also opens the MapReduce programming system to more users.
·         Pig insulates Hadoop complexity from the user.
·         Pig is similar to SQL query where the user specifies the “what” and leaves the “how” to the underlying processing engine.


Pig is useful for:
·         time sensitive data loads
·         processing many data sources and
·         for analytic insight through sampling.
Pig is not useful:
·         When processing really nasty data formats of completely unstructured data (e.g. video, audio, raw human-readable text).
·         It is definitely slow in comparison to MapReduce jobs.
·         When more power is required to optimize the code.
Modes of User Interaction with Pig
Pig has two execution types:
·         Local Mode: To run Pig in local mode, users need access to a single machine; all files are installed and run using the local host and file system. Specify local mode using the -x flag (pig -x local). Note that local mode does not support parallel mapper execution with Hadoop 0.20.x and 1.0.0
·         MapReduce Mode: To run Pig in mapreduce mode, users need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; user can,but don't need to, specify it using the -x flag (pig OR pig -x mapreduce).
Pig allows three modes of user interaction:
·         Interactive mode: The user is presented with an interactive shell called Grunt, which accepts Pig commands. Compilation and execution plan is triggered only when the user asks for output through the STORE command.
·         Batch mode: In this mode, a user submits a pre-written script called Pig script, containing a series of Pig commands, typically ending with STORE. The semantics are indetical to interactive mode.
·         Embedded mode: Pig Latin commands can be submitted via method invocations from a Java program. This option permits dynamic construction of Pig Latin programs, as well as dynamic control flow.
The Programming Language
There are 3 main/basic parts in Pig programming language:
·         The first step in a Pig program is to LOAD the data user want to manipulate from HDFS.
LOAD: Objects on which the processing is done is stored in Hadoop HDFS. In order for a Pig program to access this data and do transformations on the data, the program must first tell Pig what file (or files) it will require, and that’s done through the LOAD 'data_file' command (where 'data_file' specifies either an HDFS file or directory). If a directory is specified, all the files in that directory will be loaded into the program. If the data is stored in a file format that is not natively accessible to Pig, you can optionally add the USING function to the LOAD statement to specify a user-defined function that can read in and interpret the data.
·         Then the user runs the data through a set of transformations (which, internally, are translated into a set of map and reduce tasks).
TRANSFORM: All the data manipulation happens in the transformation logic. Here one can FILTER out rows that are not of interest, JOIN two sets of data files, GROUP data to build aggregations, ORDER results, and much more.
·         Finally, the user DUMPs the data to the screen or can STORE results in a file somewhere.
DUMP and STORE: The DUMP or STORE command is required to generate the results of a Pig pro­gram. The user can typically use the DUMP command to send the output to the screen to debug Pig programs. If the user need to do further processing or analysis on the generated output, user can change the DUMP call to a STORE call so that any results from running the programs are stored in a file. It is to be noted that user can use the DUMP command anywhere in the program to dump intermediate result sets to the screen, which is very useful for debugging purposes.
Pig Compilation and Execution Stages
A Pig program goes through a series of transformation steps before being executed as shown in the above figure.
·         Parsing: This is the first step. The parser duty is to verify that the program is syntactically correct and that all referenced variables are defined. Type checking and schema inference can also be done by parser. Other checks, such as verifying the ability to instantiate classes corresponding to user-defined etc., also occur in this phase. A canonical logical plan with one-to-one correspondence between Pig Latin statements and logical operators which are arranged in directed acyclic graph(DAG) is the output of the parser.
·         Logical Optimizer: The logical plan generated by the parser is then passed through a logical optimizer. In this stage, logical optimizations such as projection pushdown are carrie out.
·         Map-Reduce Compiler and Map-Reduce Optimizer: The optimized logical plan is then compiled into a series of Map-Reduce jobs, which then pass through another optimization phase. An example of Map-Reduce-level optimization is utilizing the Map-Reduce combiner stage to perform early partial aggregation, in the case of distributive or algebric aggregaton functions.
·         Hadoop Job Manager: The DAG of optimized Map-Reduce jobs is then topologically sorted, and then are submitted to Hadoop for execution in that order. Pig usually monitors the Hadoop execution status, and the user periodically gets the reports on the progress of the overall Pig program. Any warnings or errors that arise during execution are logged and reported to the user.



Pig Data Model
Data model can be defined as follows:
·         relation is a bag of tuples.
·         bag is a collection of tuples (duplicate tuples are allowed). It may be similar to a "table" in RDBMS, except that Pig does not require that the tuple field types match, or even that the tuples have the same number of fields. A bag is denoted by curly braces {}. (e.g. {(Alice, 1.0), (Alice, 2.0)}).
·         tuple is an ordered set of fields. Each field is a piece of data of any type (data atom, tuple or data bag) (e.g (Alice, 1.0) or (Alice, 2.0)).
·         field is a piece of data or a simple atomic value. (e.g. ‘Alice’ or ‘1.0’)
·         Data Map is a map from keys that are string literals to values that can be any data type. (Key is an atom while value can be of any type). Map is denoted by square brackets [].
Pig Latin Relational Operators
Loading and Storing:
·         LOAD: Loads data from the file system or other storage into a relation.
·         STORE: Saves a relation to the file system or other storage.
·         DUMP: Prints a relation to the console.
Filtering:
·         FILTER: Removes unwanted rows from a relation.
·         DISTINCT: Removes duplicate rows from a relation.
·         FOREACH…GENERATE: Generates data transformations based on columns of data.
·         STREAM: Transforms a relation using an external program.
Grouping and Joining:
·         JOIN: Joins two or more relations.
·         COGROUP: Groups the data in two or more relations.
·         GROUP: Groups the data in a single relation.
·         CROSS: Creates the cross product of two or more relations.
Sorting:
·         ORDER: Sorts a relation by one or more fields.
·         LIMIT: Limits the size of a relation to a maximum number of tuples.
Combining and Splitting:
·         UNION: Combine two or more relations into one.
·         SPLIT: Splits a relation into two or more relations.
Difference between Pig and Hive
Features
Pig
Hive
Schemas/Types
Yes (implicit)
Yes (explicit)
Language
Pig Latin
SQL-like
JDBC/ODBC
No
Yes (Limited)
Server
No
Optional (Thrift)
DFS Direct Access
Yes (explicit)
Yes (implicit)
Partitions
No
Yes
Web Interface
No
Yes