Pages

Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts

Analytics by SQL and Spark using Apache Zeppelin





#spark #hadoop #analytics #apache #zeppelin #scala

I was looking for a cool dashboard based query interface for analytics. I stumbled upon a cool open source project called Apache Zeppelin,

Zeppelin is a modern web-based tool for the data scientists to collaborate over large-scale data exploration and visualization projects. It is a notebook style interpreter that enable collaborative analysis sessions sharing between users. Zeppelin is independent of the execution framework itself. Current version runs on top of Apache Spark but it has pluggable interpreter APIs to support other data processing systems. More execution frameworks could be added at a later date i.e Apache Flink, Crunch as well as SQL-like backends such as Hive, Tajo, MRQL.

As their apache proposal mentioned, it does have good support for pluggable interpreters (a lot), ie. you can query files, databases, hadoop etc using this interface seamlessly. This application is easily executable in you workstation, if you want to try out. Download from the project site and follow the installation guide.

Run the zeppelin server daemon, and access the UI at http://localhost:8088

We can use different interpreters in notebooks and display the results in dashboard. I was interested in plain simple SQL db, like postgre.

create a tables sales and insert some sample data.

create table sales(category varchar, units integer);
insert into sales values('Men-Shirts', 134344);
insert into sales values('Men-Shoes', 56289);
insert into sales values('Men-Wallets', 19377);
insert into sales values('Men-Watches', 345673);
insert into sales values('Women-Shirts', 87477);
insert into sales values('Women-Skirts', 140533);
insert into sales values('Women-Shoes', 77301);
insert into sales values('Electronics-Mobile', 67457);
insert into sales values('Electronics-Tablets', 21983);
insert into sales values('Electronics-Accessories', 865390);

Create a notebook,


setup the connection properties in psql interpreter configuration.



and run with %psql interpreter. In the notebook, type in,
%psql  select * from sales


You have the dashboard ready. You can share the graph as a link and run the notebook scheduled.


Then I decided to use the spark code. As it supports jdbc source, use that in the spark context. In Spark, JdbcRDD can be used to connect with a relational data source. RDDs are a unit of compute and storage in Spark but lack any information about the structure of the data i.e. schema. Dataframes combine RDDs with Schema. To support postgre as source, you need the driver loaded to execute the queries or building schema. Copy the driver to $ZEPLLIN_HOME/interpreter/spark and restart the daemon. If you don't do this, you will not be able to source postgre and may get jdbc connection errors like "No suitable driver found" etc.

Use the notebook to provide the spark code,

In the %sql (to be noted, its not %psql) interpreter provide,

%sql select * from sales

You have to schedule the %sql notebook only and the dashboard is updated based on the data inserts when the cron job is triggered.



Simple metastore creation for Hive in MySQL


For Hive, the meta-store is like the system catalog which contains metadata about the tables stored in Hive. This metadata is specified during table creation and reused every time the table is referenced in HiveQL. The database is a namespace for tables, where ‘default’ is used for tables with no user supplied database name. The metadata for table contains list of columns and their types, owner, storage and SerDe information (which I can detail in future posts). It can also contain any user supplied key and value data; which can be used for table statistics. Storage information includes location of the table’s data in the underlying file system, data formats and bucketing information. SerDe (which controls how Hive serializes/deserializes the data in a row) metadata includes the implementation class of serializer and deserializer methods and any supporting information required by that implementation. The partitions can have its own columns and SerDe and storage information which can be used in the future to evolve Hive schema.The metastore uses either a traditional relational database (like MySQL, Oracle) or file system and not HDFS since it is optimized for sequential scans only),thus the fired HiveQL statements are executed slow which only access metadata objects.


its simple to install the metastore.

-install mysql-conector
$ sudo yum install mysql-connector-java
-create a symbolic link in the Hive directory
$ ln -s /usr/share/java/mysql-connector-java.jar /usr/lib/hive/lib/mysqlconnector-java.jar

-create the database for the Hive metastore.cdh4 ships with scripts for derby,mysql,oracle and postgre
$ mysql -u root -p
mysql> CREATE DATABASE hivemetastoredb;
mysql> USE hivemetastoredb;
mysql> SOURCE /usr/lib/hive/scripts/metastore/upgrade/mysql/hive-schema- 0.9.0.mysql.sql;

-create a user for the metastore
mysql>CREATE USER 'hive'@'%' IDENTIFIED BY 'hive';

-grant access for all hosts in the network
mysql> GRANT ALL PRIVILEGES ON hivemetastoredb.* TO hive@'%' WITH GRANT OPTION;
mysql> FLUSH PRIVILEGES;

following entries in the file /etc/hive/conf/hive-sites.xml, if you are trying a jdbc connection
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost/hivemetastoredb</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>false</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>true</value>
</property>


Nodeable - Realtime Insights

#Nodeable is a good example of generating #insights from #bigdata or the real time trickle feeds. It uses Twitter's Storm for the processing engine Stream reduce. I signed up for a trial account to play around.



The insights like "Most Active" metrics are generated for Amazon Web services status. The reports are generated and tagged in real time. The twitter follow counts are displayed.


It has only some basic set of connectors, but one can create custom connectors using its JSON Schema. The outbound data can be pushed to your own Amazon s3 or Hadoop WebHDFS, which is good for private companies.




The github/rss stream is shown as activity stream.



Sharing an interesting presentation of  Storm real-time computation.
ETE 2012 - Nathan Marz on Storm from Chariot Solutions on Vimeo.

Hadoop meetup @inmobi Bangalore

Had a chance to attend the #hadoop #meetup today at #Inmobi Bangalore.

 Arun Murthy and Suresh Srinivasan from Hortonworks made presentations on next gen Hadoop and HDFS Namenode High Availability respectively.

From Inmobi, they had presentations on Real time analytics done on HBase and Ivory, an opensource  feed processing platform by Srikanth









Dream On!

Creating index in Hive


Simple:
CREATE INDEX idx ON TABLE tbl(col_name) AS 'Index_Handler_QClass_Name' IN TABLE tbl_idx;
As to make pluggable indexing algorithms, one has to mention the associated class name that handles indexing say for eg:-org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler
The index handler classes implement HiveIndexHandler
Full Syntax:
CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS 'index.handler.class.name'
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name=property_value, ...)]
[IN TABLE index_table_name]
[PARTITIONED BY (col_name, ...)]
[
   [ ROW FORMAT ...] STORED AS ...
   | STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
[COMMENT "index comment"]
  • WITH DEFERRED REBUILD - for newly created index is initially empty. REBUILD can be used to make the index up to date.
  • IDXPROPERTIES/TBLPROPERTIES - declaring keyspace properties
  • PARTITIONED BY - table columns where in the index get partitioned, if not specified index spans all table partitions
  • ROW FORMAT  - custom SerDe or using native SerDe(Serializer/Deserializer for Hive read/write). A native SerDe is used if ROW FORMAT is not specified
  • STORED AS  - index table storage format like RCFILE or SEQUENCFILE.The user has to uniquely specify tabl_idx name is required for a qualified index name across tables, otherwise they are named automatically. STORED BY - can be HBase (I haven't tried it)

The index can be stored in hive table or as RCFILE in an hdfs path etc. In this case, the implemented  index handler class usesIndexTable() method will return false.When index is created, the generateIndexBuildTaskList(...) in index handler class will generate a plan for building the index.

Consider CompactIndexHandler from Hive distribution,

It  only stores the addresses of HDFS blocks containing that value. The index is stored in hive metastore FieldSchema as _bucketname and _offsets in the index table.

ie the index table contains 3 columns, with _unparsed_column_names_from_field schema (indexed columns), _bucketname(table partition hdfs file having columns),[" _blockoffsets",..."]



See the code from CompactIndexHandler,
   

What's it about Cascading?




Cascading helps manipulating data in Hadoop. It is a framework written in Java which abstracts map reduce that allows to write scripts to read and modify data inside Hadoop. Provides a programming API for defining and executing fault tolerant data processing workflows and a query processing API in which the developers can go without map reduce. There are quite a number of DSLs built on top of Cascading, most noteably Cascalog (written in Clojure) and Scalding (written in Scala). There is Pig data processing API which is similar but SQLy.








Terminology

Taps - streams of source (input) and sink (output)
Tuple - can be considered as a result set. This is a single row with named columns of data being processed. A series of tuples make a stream.All tuples in a stream have the exact same fields.
Pipes - tie operations together when executed upon a Tap. Pipe Assembly is created when pipes are successuvely executed.Pipe assemblies are Directed Acyclic Graphs.
Flows - reusable combinations of source,sink and pipe assemblies.
Cascade - series of flows

What all operations possible? 

Relational - Join, Filter, Aggregate etc
Each - for each row result (tuple)
Group - Groupby
CoGroup - joins for tuples
Every - for every key in group or cogroup, like an aggregate function to all tuples in a group at once
SubAssembly - nesting reusable pipe assemblies into a Pipe

Internally the cascading employs an intelligent planner to convert the pipe assembly to a graph of dependent MapReduce jobs that can be executed on a Hadoop cluster.
 
What are the advantages from a normal map reduce workflow do this Cascading have? (Need to investigate!)

The mythical unstructured data!

As semantic web and big data integration gaining its fus-ro-dah, enterprises are finding a way to harness any available form of information swarming the web and the world

I came across some interesting artcles which gives a concise idea of harnessing metadata from unstructured data....

Lee Dallas says

In some respects it is analogous to hieroglyphics where pictographs carry abstract meaning.  The data may not be easily interpretable by machines but document recognition and capture technologies improve daily. The fact that an error rate still exists in recognition does not mean that the content lacks structure.  Simply that the form it takes is too complex for simple processes to understand.

more here : http://bigmenoncontent.com/2010/09/21/the-myth-of-unstructured-data/

A lot of data growth is happening around these so-called unstructured data types. Enterprises which manage to automate the collection, organization and analysis of these data types, will derive competitive advantage.
Every data element does mean something, though what it means may not always be relevant for you.

more here : http://bigdataintegration.blogspot.in/2012/02/unstructured-data-is-myth.html

 

 

Consistent Hashing

What is a consistent hash function?

A consistent hash function is one which changes minimally as the range of function changes.

What's the advantage of such functions?

This is ideal when set of buckets change over time. Two users with inconsistent but overlapping sets of buckets will map items to the same bucket with high probability. So this eliminates the need of "maintaining" a consistent "state" among all nodes in a network. The algorithm can be used for making consistent assignments or relationships between different sets of data in such a way that if we add or remove items, the algorithm can be recalculated on any machine and produce the same results.

Theory

A view V is a set of buckets where user is aware. A client uses a consistent hash function, f(V,i), maps an object to one of the buckets in the view. Say, assign each of hash buckets to random points on mod 2^n circle (virtually!) where hash key size = n. The hash of object= closest clockwise bucket. These small sets of buckets lie near the object. In this case, all the buckets get roughly same number of items. When kth bucket is added only a 1/k fraction of items move. This means when new node is added only minimum reshuffle is needed, which is the advantage of having a view. There can be a hash structure for the key lookup (a balanced tree) which stores the hash of all nodes (in the view).  When a new node is added its hash value is added to the table.

Suppose there are two nodes A and B three objects 1–3 (mapped to a hash-function’s result range). The objects 3 and 1 are mapped to node A, object 2 to node B. When a node leaves the system, data will get mapped to their adjacent node (in clockwise direction) and when a node enters the system it will get hashed onto the ring and will overtake objects.



As an example, (refer link1, link2), the circle denotes a range of  key values. Say, the points in circle represents 64 bit numbers. Hash the data to get the 64 bit number, which is a point in the circle. Take the IPs of nodes and hash them into 64 bit number and point in the circle. Associate the data to the nodes in the clockwise direction (ie. closest, which can be retrieved from the node in the hash structure).  When a new node is inserted into the hash tree, the data will always be assigned to the closest one only. Everything between this number and one that's next in the ring and that has been picked by a different node previously, is now belong to this node.

The basic idea of consistent hash function is to hash both objects and buckets using the same function. It's one of the best ways to implement APIs that can dynamically scale out and rebalanced. The client applications can calculate which node to contact in order to request or write the data with no metadata server required.


Used by

memcached cluster.
Typically, multiple memcached daemons are started, on different hosts. The clients are passed a list of memcached addresses (IP address and port) and pick one daemon for a given key. This is done via consistent hashing, which always maps the same key K to the same memcached server S. When a server crashes, or a new server is added, consistent hashing makes sure that the ensuing rehashing is minimal. Which means that most keys still map to the same servers, but keys hashing to a removed server are rehashed to a new server. - from A memcached implementation in JGroups


Amazon's Dynamo uses consistent hashing along with replication as a partitioning scheme.
Data is partitioned and replicated using consistent hashing [10], and consistency is facilitated by object versioning [12]. The consistency among replicas during updates is maintained by a quorum-like technique and a decentralized replica synchronization protocol. - from Dynamo: Amazon's Highly Available Key-value Store

Data of a Cassandra table gets partitioned and distributed among the nodes by a consistent hashing function.
Cassandra partitions data across the cluster using consistent hashing [11] but uses an order preserving hash function to do so. In consistent hashing the output range of a hash function is treated as a circular space or "ring" (i.e. the largest hash value wraps around to the smallest hash value). Each node in the system is as-signed a random value within this space which represents its position on the ring. Each data item identified by a key is assigned to a node by hashing the data item's key to yield its position on the ring, and then walking the ring clockwise to fi nd the first node with a position larger than the item's position. This node is deemed the coordinator for this key. The application specifi es this key and the Cassandra uses it to route requests. Thus, each node becomes responsible for the region in the ring between it and its predecessor node on the ring. The principal advantage of consistent hashing is that departure or arrival of a node only aff ects its immediate neighbors and other nodes remain una ffected. - from  Cassandra - A Decentralized Structured Storage System
 Voldemort automatic sharding  of  data. Nodes  can  be added  or  removed  from  a  database  cluster,  and  the system adapts automatically.  Voldemort automatically detects and recovers failed nodes. [refer]

References:
http://www.akamai.com/dl/technical_publications/ConsistenHashingandRandomTreesDistributedCachingprotocolsforrelievingHotSpotsontheworldwideweb.pdf
http://sharplearningcurve.com/blog/2010/09/27/consistent-hashing/
http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html




About Bulk Synchronous Parallel(BSP) model

As an alternative to mapreduce paradigm, there is another parallel computing model  called Bulk Synchronous Parallel(BSP). A BSP computer is defined as a set of processors with local memory, interconnected by a communication mechanism (e. g., a network or shared memory) capable of point-to-point communication, and a barrier synchronization mechanism. It differentiates/decouples the use of local memory from that of remote memory. A BSP program consists of a set of BSP processes and a sequence of super-steps—time intervals bounded by the barrier synchronization. Each processor has its own local memory module, and all other memories are non-local where they are accessed by networking. The communication between processors are non-blocking.The essence of the BSP model is super-step. At the start of super step computations are done locally. Then, using the messaging system in the network, the other processes can handle requests for further computation.The communication and synchronization are decoupled. There exists a barrier synchronization in which the processors wait and sync when all communications are completed. When all processes have invoked the sync method and all messages are delivered, the next super-step begins. Then the messages sent during the previous super-step can be accessed by its recipients.The data locality is an inherent part of this model in which the communication is made only when the peer data in necessary. This is different from mapreduce frameworks in which they do not preserve data locality in consecutive operations. During mapreduce processing, it generally passes input data through either many passes of mapreduce or mapreduce iteration in order to derive final results which makes communication cost added on to the processing cost. So BSP is useful with many programs requiring iterations and recursions.



Apache Hama  is one such project enabling hadoop to leverage BSP. Google Pregel uses BSP for large scale mining of graphs.

reference:

http://en.wikipedia.org/wiki/Bulk_synchronous_parallel
http://incubator.apache.org/hama/

Using Avro to serialize logs in log4j


I have written about serialization mechanism of Protocol Buffers previously. Similarly, Apache Avro provides a better serialization framework. 

It provide features like:

 - Independent Schema -  use different schemas for serialization and de-serialization
 - Binary serialization - compact data encoding, and faster data processing
 - Dynamic typing - serialization and deserialization without code generation

 We can encode data when serializing with Avro: binary or JSON. In the binary file schema is  included at the beginning of file. In JSON, the type is defined along with the data. Switching JSON protocol to a binary format in order to achieve better performance is pretty straightforward with Avro. This means less type information needs to be sent with the data and it stores data with its schema means any program can de-serialize the encoded data, which makes a good candidate for RPC.

 In Avro 1.5 we have to use (this is different from previous versions which had no factory for encoders)
 - org.apache.avro.io.EncoderFactory.binaryEncoder(OutputStream out, BinaryEncoder reuse) for binary
 - org.apache.avro.io.EncoderFactory.jsonEncoder(Schema schema, OutputStream out) for JSON

 The values (Avro supported value types) are put for the schema field name as the key
 in a set of name-value pairs called  GenericData.Record

 Avro supported value types are
  Primitive Types - null, boolean, int, long, float, double, bytes, string
  Complex Types - Records, Enums, Arrays, Maps, Unions, Fixed
 
  you can read more about them  here

  An encoded schema definition to be provided for the record instance. To read/write data, just use put/get methods
 
   I have used this serialization mechanism to provide a layout for log4j. The logs will be serialized to avro mechanism.

github project is here - https://github.com/harisgx/avro-log4j
 
   Add the libraries to your project and add new properties to log4j.properties

   log4j.appender.logger_name.layout=com.avrolog.log4j.layout.AvroLogLayout
   log4j.appender.logger_name.layout.Type=json
   log4j.appender.logger_name.layout.MDCKeys=mdcKey
 
 Provide the MDC keys as comma seperated values
 
 
   This is the schema


 
 

Bloom Filters

A Bloom filter is a probabilistic data-structure. This can be used to store a set of data in a space-efficient manner. For eg; a distributed cache called Cache Digests shared as summaries between the nodes to have a global image. 

The data-structure can be used to provide membership queries ie. checkIfDataPresentInStore() If it is to check an element is already inserted in the filter then it will return true, there are no false negatives. But there can be chance if the element not inserted may return true. But the check for that element can be done in the original store ie. the overhead is associated with the rate of false positives. This is different from dictionary in which the hit/miss is deterministic.

For a set of n elements, a bloom filter can be a vector of size m.Initially, all bits are set to 0. For each element e, k hash functions will set k bits in the bit vector to 1. When a query for membership executed, it will check for the bit positions for the set value. If matches all, the queried element is possibly present in the store else, it is sure not present.Each hash function returns the index to set. This means we have to store these m bits per key. So a total of m * N bits of space required. The use of different hash functions results less collision.


Uses
  • Design a spell checker
  • Database join implementation (Oracle)  
  • Peer to peer (P2P) communication and routing  
  • In HBase, the Bloom filter is stored as meta block in the HFile. When a HFile is opened, the bloom filter is loaded into memory and used to determine if a given key is in that store file. This can avoid the scanning region for the key. 
  • and more

I found a java implementation here
Cassandra's java implementation here

Reference

http://en.wikipedia.org/wiki/Bloom_filter
https://issues.apache.org/jira/browse/HBASE-1200
http://wiki.squid-cache.org/SquidFaq/CacheDigests
http://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf

Parallel Databases and Map Reduce

n 2007 Google handled 400 petabytes of data per month.

It was Map Reduce. And the competitors were Dyrad and Hadoop

Architects were reinventing parallel databases.

Data is partitioned across multiple disks. The parallel I/O enables relational operations like join executed in parallel. Teradata, is one of the highly parallel database machines uses shared-nothing architecture Google called it sharding, infinitely scalable almost simply by adding nodes.

In parallel databases the horizontal partitioning is done so that the tuples of a relation are divided among many disks such that each tuple resides on one disk.

The strategies can be :

a. round robin, the ith tuple inserted in the relation to disk i mod n.
b. hashing - send to the result (ie i [1.. n-1]th disk) of hash function h applied to the partitioning attribute value of a tuple.
c. Range partitioning - for a partitioning vector [range] say [3,9], a tuple with partitioning attribute value of 1 will go to disk 0, a tuple with value 6 will go to disk 1, while a tuple with value 12 will go to disk2 etc.

In these databases, the queries/transactions execute in parallel with one another.Relational queries by SQL is good for parallel execution.

In a normal relational db, the execution plan for a query like this,
SELECT YEAR(date) AS year, AVG(fund)
FROM student
GROUP BY year

The execution plan is:
projection(year,fund) -> hash aggregation(year,fund).

In parallel dbs it will be,

projection(year,fund) -> partial hash aggregation(year,fund)
-> partitioning(year) -> final aggregation(year,fund).

Each operator produces a new relation, so the operators can be composed into highly parallel dataflow graphs. By streaming the output of one operator into the input of another operator, the two operators can work in series giving pipelined parallelism. By partitioning the input data among multiple processors and memories, an operator can often be split into many independent operators each working on a part of the data. This partitioned data and execution gives partitioned parallelism



image source and reference: link

Parallel databases uses the parallel data flow model makes programming easy as data is not shared.

MapReduce can be programmed in languages like C, Java, Python and Perl and process flat files in a filesystem, ie no need to have database schema definitions.This is an advantage when documents are processed.It uses a set of input key/value pairs, and produces a set of output key/value pairs. Programmer two functions: map and reduce. Map an input pair and produces a set of intermediate key/value pairs. Then intermediate values are grouped together based on the same intermediate key and passes them to the reduce function. The reduce function accepts the intermediate key and a set of values for that key and merges these values together to result smaller set of values, zero or one output value is produced in the process, thus helps in reducing memory and handles a lot of values making the job scalable. link

HadoopDB is a hybrid of DBMS and MapReduce technologies that targets analytical workload designed to run on a shared-nothing cluster.It uses Hadoop to push the data to existing databases engine to process the SQL queries.In order to push more query logic into databases (e.g. joins), hash-partitioning of data needs to be performed. The data is loaded into HDFS. for processing. HadoopDB's API provide the implementation of Map and Reduce functions.This makes hadoopdb to process huge analytical database with scalability using map reduce framework and performance with exisiting db engines.

refer:link

image source link