Pages

Analytics by SQL and Spark using Apache Zeppelin





#spark #hadoop #analytics #apache #zeppelin #scala

I was looking for a cool dashboard based query interface for analytics. I stumbled upon a cool open source project called Apache Zeppelin,

Zeppelin is a modern web-based tool for the data scientists to collaborate over large-scale data exploration and visualization projects. It is a notebook style interpreter that enable collaborative analysis sessions sharing between users. Zeppelin is independent of the execution framework itself. Current version runs on top of Apache Spark but it has pluggable interpreter APIs to support other data processing systems. More execution frameworks could be added at a later date i.e Apache Flink, Crunch as well as SQL-like backends such as Hive, Tajo, MRQL.

As their apache proposal mentioned, it does have good support for pluggable interpreters (a lot), ie. you can query files, databases, hadoop etc using this interface seamlessly. This application is easily executable in you workstation, if you want to try out. Download from the project site and follow the installation guide.

Run the zeppelin server daemon, and access the UI at http://localhost:8088

We can use different interpreters in notebooks and display the results in dashboard. I was interested in plain simple SQL db, like postgre.

create a tables sales and insert some sample data.

create table sales(category varchar, units integer);
insert into sales values('Men-Shirts', 134344);
insert into sales values('Men-Shoes', 56289);
insert into sales values('Men-Wallets', 19377);
insert into sales values('Men-Watches', 345673);
insert into sales values('Women-Shirts', 87477);
insert into sales values('Women-Skirts', 140533);
insert into sales values('Women-Shoes', 77301);
insert into sales values('Electronics-Mobile', 67457);
insert into sales values('Electronics-Tablets', 21983);
insert into sales values('Electronics-Accessories', 865390);

Create a notebook,


setup the connection properties in psql interpreter configuration.



and run with %psql interpreter. In the notebook, type in,
%psql  select * from sales


You have the dashboard ready. You can share the graph as a link and run the notebook scheduled.


Then I decided to use the spark code. As it supports jdbc source, use that in the spark context. In Spark, JdbcRDD can be used to connect with a relational data source. RDDs are a unit of compute and storage in Spark but lack any information about the structure of the data i.e. schema. Dataframes combine RDDs with Schema. To support postgre as source, you need the driver loaded to execute the queries or building schema. Copy the driver to $ZEPLLIN_HOME/interpreter/spark and restart the daemon. If you don't do this, you will not be able to source postgre and may get jdbc connection errors like "No suitable driver found" etc.

Use the notebook to provide the spark code,

In the %sql (to be noted, its not %psql) interpreter provide,

%sql select * from sales

You have to schedule the %sql notebook only and the dashboard is updated based on the data inserts when the cron job is triggered.



Json parsing, Scala way



Most java developers are familiar with json parsing and object mapping using Jackson library's object mapper functionality that enables serializing POJOs to json string and back. In scala, using the play's json inception mechanism provides a subtle way to serialize json. Using the powerful Scala macros, (a macro is a piece of Scala code, executed at compile-time, which manipulates and modifies the AST of a Scala compile-time metaprogramming), it is able to introspect code at compile-time based on Scala reflection API, access all imports, implicits in the current compile context and generate code. This means the case classes are automatically serialized to json. Also, you can explicitly provide the path to json key and map the value to object's field. But, for simple case classes they are just another boiler-plate code. Use it when we need more powerful mapping and logic for serialized fields. So how does this mapping works? The compiler will inject code into compiled scala AST (Absract Syntax Tree) as the macro-compiler replaces, say, Json.reads[T] by injecting into compile chain and eventually writes out the code for mapping fields in json to object. Internally, play's json module use Jackson's object mapper (ref: play.api.libs.json.jackson.JacksonJson). 

You can add dependency in build.sbt in a minimal-scala project which will provide Json APIs from play framework -
  "com.typesafe.play" %% "play-ws" % "2.4.2" withSources()

For eg, if we have to two classes (in this case class),

case class Region(name: String, state: Option[String])
case class Sales(count: Int, region: Region)

You have to add the implicit  methods for reading and writing to and from json and objects. The methods marked implicit will be inserted for you by the compiler and type is inferred from the context. Any compilation will fail if no implicit value of the right type is available in scope.

implicit val readRegion = Json.reads[Region]
implicit val readSales = Json.reads[Sales]
implicit val writeRegion = Json.writes[Region]
implicit val writeSales = Json.writes[Sales]

If you interchange the order, from readRegion and readSales, you will get compilation error.As the compiler creates a Reads[T] by resolving case class fields & required implicits at COMPILE-time, If any missing implicit is discovered, compiler will break with corresponding error.

 Error:(12, 38) No implicit format for test.Region available.
   implicit val readSales = Json.reads[Sales]
 

Interesting method to try is the validate() method while converting json to object which will help to pin point the path of error.

Executing the following program:



Results:

This is testing json..
Test 1
-------
Result:Some(Sales(123,Region(West,None)))
Test 2
-------
Error at JsPath: /region/name
error.path.missing
()
Result:None
Test 3
------
Error at JsPath: /count
error.expected.jsnumber
Error at JsPath: /region/name
error.expected.jsstring
()
Result:None
Test 4
------
Result:{"count":123,"region":{"name":"West","state":"California"}}
Process finished with exit code 0