Graphs are one of the most popular computer science concepts. They have been extensively used in real-world applications be it a GPS on your phone or GPS device in your car that shows you the shortest path to your destination to a social network that suggests people that you can add to your list, graphs are everywhere. As the amount of data increases the concepts of graphs (breadth-first search, Dijkstra’s etc.) all remain the same but the way the graphs are actually built changes. If you take the case of a social network, a particular person in a network can have hundreds of connections in his network and those connections might be further connected to hundreds of other users which may be physically in a different country altogether. Storing all this information in a typical relational database would not scale at all. Hence, we need specific technologies that cater to this scale of data and hence the usage of big data and big data system.
So, what would we cover in this article
- Building graphs on big data stored in hdfs using graph frames on top of apache spark.
- Analyzing a real-world flights dataset using graphs on top of big data.
GraphFrames
To build graphs and analyze graphs on big data using apache spark, we have used an open source library graph frames. Currently to build graphs and analyze graphs using ‘Java’ this is the only option available on apache spark. Spark has an excellent inbuilt library ‘GraphX’ but that is directly coupled with Scala and I did not try using it with java. Graph frames is also massively scalable as it is built on top of datasets and is much easier to use as you will see.
Graph Analytics on Airports and Flights dataset
This is a very popular real-life dataset that we are using for our analysis. It is obtained from open-flights airports database (https://openflights.org/data.html). There are 3 datasets in this and they are
Airports Dataset
This dataset contains information about airports as shown below :
- Airport ID : The id given to the airports per row in this dataset
- Airport IATA Code : 3-letter IATA code. Null if not assigned/unknown.
- Airport ICAO Code : 4-letter ICAO code. Null if not assigned.
- Airport Name : Name of the airport
- Country : Country in which the airport is located
- State : State in which the airport is located
Routes Dataset
This dataset contains information about the routes between the airports as shown
- Airline : 2-letter (IATA) or 3-letter (ICAO) code of the airline.
- Airline ID : Unique OpenFlights identifier for airline
- Source airport : 3-letter (IATA) or 4-letter (ICAO) code of the source airport
- Source airport ID : Unique OpenFlights identifier for source airport
- Destination airport : 3-letter (IATA) or 4-letter (ICAO) code of the destination airport
- Destination airport ID : Unique OpenFlights identifier for destination airport
Airlines Dataset
This dataset contains information about the airlines that are represented in this dataset.
- Airline ID : Unique OpenFlights identifier for this airline.
- Name : Name of the airline.
- IATA : 2-letter IATA code, if available.
- ICAO : 3-letter ICAO code, if available.
- Country : Country or territory where airline is incorporated.
Let’s start our analysis using apache spark and graph frames.
Analysis of Flights data
Before we run any analysis, we will build our regular spark boiler plate code to get started. We will create the spark session to start loading our datasets.
SparkConf conf = ...SparkSession session = ...
Let’s now load the airports dataset. Even though this file is stored locally but it can reside it HDFS or in amazon s3 and apache spark is quite flexible to let us pull this.
Dataset<Row> rawDataAirport = session.read().csv("data/flight/airports.dat");
Now let’s see the first few rows of this data. Spark has a handy show() method for this as
rawData.show();
We will now provide some schema to these raw columns. For this, we will map and load this data into java pojo as shown. Our pojo object is Airport
JavaRDD<Airport> airportsRdd = rawDataAirport.javaRDD().map(row -> { Airport ap = new Airport(); ap.setAirportId(row.getString(0)); ap.setState(row.getString(2)); ... return ap; });
We can convert this rdd into a dataset as dataset is easier to query and use as.
Dataset<Row> airports = session.createDataFrame(airportsRdd.rdd(),Airport.class);airports.createOrReplaceTempView("airports");
Similar to this we can also load data for routes as shown.
Dataset<Row> rawDataRoute = session.read().csv("data/flight/routes.dat");
Again, we can load each row into a java pojo Route and store in an rdd object.
JavaRDD<Route> routesRdd = rawDataRoute.javaRDD().map(row -> { Route r = new Route(); r.setSrc(String) r.setDst(String)... return r;});
We will convert it back to a dataset as we did earlier for airports.
Dataset<Row> routes = session.createDataFrame(routesRdd.rdd(), Route.class);
Now we have two datasets — airports and routes. As you can recall that graphs are built using Nodes and Edges in computer science. From the perspective of graphs our nodes are the airports and they are connected with edges via the routes they offer.
So, to build the graph using graphframe we provide the nodes and edges that is airports and routes as
GraphFrame gf = new GraphFrame(airports, routes);
Note: Graphframe requires that you have an ‘id’ attribute in your vertices and a corresponding ‘src’ and ‘dest’ attribute in your edge (check that our pojo’s had these columns in attributes).
Now our graph object is ready and it sits on top of big data using a spark, graphframe stack.
gf.vertices().show();
This would print the vertices (and their attributes)
Now let’s see the airports in India and this is quite easy with graphframe
gf.vertices().filter("country = India'").show();
This would show the first few lines of airports in India as shown
Check the states on the right hand side above and it shows the airports in different cities as ‘Ahmedabad’, ‘Mumbai’, ‘Bhopal’ etc.
Now let’s find the total airports in India. This is a simple query as shown
System.out.println("Airports in India ----> " + gf.vertices().filter("country = 'India").count());
This would print the number of airports in India as
Airports in India → 125
Note: This result is as per this dataset only. You might need to check on the web for the authenticity of this info.
Let’s now find the total flights going in and out of ‘Indira Gandhi international airport’ in Delhi
For this we find the degree (number of edges flowing in and out of vertices) of the edges and fire a corresponding query on the corresponding dataset.
Dataset<Row> degreesDS = gf.degrees(); degreesDS.createOrReplaceTempView("DEGREES");session.sql("select a.airportName, a.State, a.Country, d.degree from AIRPORTS a,DEGREES d where a.airportIataCode = d.id and d.id = 'DEL'" ).show();
And this would print the data as
That’s quite a lot right. 527 flights flowing in and out per day from this airport.
We can also break this info into flights flowing into versus out of this airport. To do this instead of ‘degrees()’ method use the ‘inDegrees()’ and the ‘outDegrees()’ method and rest of the code will be similar. I leave this code for you to do on your own but when you run that code you should see the following output
Flights Going out : Indira Gandhi International Airport , Delhi , India , 264
Flights Going in : Indira Gandhi International Airport , Delhi , India , 263
Similarly, we can find flights going out or into other airports too. Also since this is plain sql only you can fire a query to find the top airports in the country with respect to the number of flights going in and out.
Let’s now find the direct flights that run between ‘delhi’ and ‘bangalore’. This is again a simple query
session.sql("select a.airlineName, r.src,r.dst from ROUTES r, AIRLINES a " + "where r.src = 'DEL' and r.dst = 'BOM' and r.airlineCode = a.airlineId").show();
And this would print the airlines giving direct flights between delhi and mumbai
This same data can be fetched using the concepts of ‘triplets’ in a graph as shown
gf.triplets().filter("src.airportIataCode='DEL' and dst.airportIataCode='BOM'").show();
This would print the data as
Now let’s find a triplet or direct flight between ‘Delhi’ and ‘Bhuj’
gf.triplets().filter("src.airportIataCode='DEL' and dst.airportIataCode='BHJ'").show();
This would print the result as
This is an empty result. Thus no direct flight exists. Now we need to find if there is s ‘single stop flight’ to this destination. Here comes the power of graphs we can simply do this using bread first search as.
Dataset<Row> sfoToBufDS = gf.bfs().fromExpr("id = 'DEL'").toExpr("id = 'BHJ'").maxPathLength(2).run();
As you can see above the result of the breadth first search is also in the form of a dataset and we store it in a variable .Next we register this dataset as a temporary view called as ‘sfo_to_buf’
sfoToBufDS.createOrReplaceTempView("sfo_to_buf");
Finally, we will query on top of this temporary view to figure out the starting state of the vertice and the connecting state and finally the state where the flight ends (in our case this is buffalo). We will also print the output to console by invoking the show method.
session.sql("select distinct from.state , v1.state, to.state fromsfo_to_buf").show(100);
This would print the result on the screen as
Thus in order to go to Bhuj from delhi, you can take first flight to Mumbai and from Mumbai take a direct second flight to Bhuj. You can beautify the result more by changing the queries and also showing the airlines.
Last, let’s see an important and complicated piece. If I tell you now to group the airports in India based on their importance. One way to do this is to check the max flights going in and out. But another way to do this is using the page rank algorithm. Thus, it’s not just the number of flights we will also see how the important airport is directly connected to another important airport and based on this each airport gets a score and ranking. PageRank is bundled inside graphframe so the code is just one liner as shown
Dataset pg = gf.pageRank().resetProbability(0.15).maxIter(5).run().vertices();
This operation would run for long as it goes through all the nodes and edges. Can you imagine how google would be doing this on the whole amount of data they have got ?
Now this would print the output as. As you can see each airport gets a pagerank value as shown on the right hand side column on this image below
More this pagerank value is, the more important the airport it is. To get a more meaningful and sorted result (according to their pageranks) lets fire another query on top of this page ranked dataset above as shown.
pg.createOrReplaceTempView("pageranks");
session.sql("select * from pageranks order by pagerank desc").show(20);
This would print the result as
Looks like our page rank results are quite good. Our top most airport is Delhi followed by Mumbai and then Kolkata and Bangalore.
Summary:
This article helped us explore how easy it is to do complex graph analytics on big data using apache spark and graphframes. Also as part of this article we delved into path analytics but graph analytics can help us do social graph analytics too.
Learn more on big data and graph analytics:
- Big Data Analytics with Java by packt publishers.
- Apache Spark official website
- GraphFrames official link