Tuesday, November 19, 2013

R and Solr Integration Using Solr's REST APIs


Solr is the most popular, fast and reliable open source enterprise search platform from the Apache Luene project.  Among many other features, we love its powerful full-text search, hit highlighting, faceted search, and near real-time indexing.  Solr powers the search and navigation features of many of the world's largest internet sites.  Solr, written in Java, uses the Lucene Java search library for full-text indexing and search, and has REST-like HTTP/XML and JSON APIs that make it easy to use from virtually any programming language including R.  

We invested significant amount of time integrating our R-based data-management platform with Solr using HTTP/JSON based REST interface.  This integration allowed us to index millions of data-sets in solr in real-time as these data-sets get processed by R.  It took us few days to stabilize and optimize this approach and we are very proud to share this approach and source code with you.  The full source code can be found and downloaded from datadolph.in's git repository

The script has R functions for:
  • querying Solr and returning matching docs
  • posting a document to solr  (taking a list and converting it to JSON before posting it)
  • deleting all indexes, deleting indexes for a certain document type and for a certain category within document type
     # query a field for the text and return docs
      querySolr <- function(queryText, queryfield="all") {
        response <- fromJSON(getURL(paste(getQueryURL(), queryfield, ":", queryText, sep="")))
        if(!response$responseHeader$status) #if 0
          return(response$response$docs)
      }

      # delete all indexes from solr server
      deleteAllIndexes <-function() {
        response <- postForm(getUpdateURL(),
                             .opts = list(postfields = '{"delete": {"query":"*:*"}}',
                                          httpheader = c('Content-Type' = 'application/json', 
                                                         Accept = 'application/json')
                                          ssl.verifypeer=FALSE
                             )
        ) #end of PostForm
        return(fromJSON(response)$responseHeader[1])
      }

      # delete all indexes for a document type from solr server 
      # in this example : type = sports
      deleteSportsIndexes <-function() {
        response <- postForm(getUpdateURL(),
                             .opts = list(postfields = '{"delete": {"query":"type:sports"}}',
                                          httpheader = c('Content-Type' = 'application/json', 
                                                         Accept = 'application/json'),
                                          ssl.verifypeer=FALSE
                             )
        ) #end of PostForm
        return(fromJSON(response)$responseHeader[1])
      }

      # delete indexes for all baskeball category in sports type from solr server 
      # in this example : type = sports and category: basketball
      deleteSportsIndexesForCat <-function(category) {
        response <- postForm(getUpdateURL(),
                             .opts = list(postfields = 
                               paste('{"delete": {"query":"type:sports AND category:', category, '"}}', sep=""),
                                          httpheader = c('Content-Type' = 'application/json', 
                                                         Accept = 'application/json'),
                                          ssl.verifypeer=FALSE
                             )
        ) #end of PostForm
        return(fromJSON(response)$responseHeader[1])
      }
      #deletePadIndexesForCat("baskeball")

      #Post a new document to Solr
      postDoc <- function(doc) { 
        solr_update_url <- getUpdateURL()
        jsonst <- toJSON(list(doc))
        response <- postForm(solr_update_url,
                             .opts = list(postfields = jsonst,
                                          httpheader = c('Content-Type' = 'application/json', 
                                                         Accept = 'application/json'),
                                          ssl.verifypeer=FALSE
                             )) #end of PostForm
        return(fromJSON(response)$responseHeader[1])
        ########## Commit - only if it doesn't work the other way ###############
        #return(fromJSON(getURL(getCommitURL())))
      }

Happy Coding!

Friday, June 14, 2013

Simulating Map-Reduce in R for Big Data Analysis Using Flights Data

We are constantly crunching through large amounts of data and designing unique and innovative ways to process large datasets on a single node and use distributed computing only when single node computing becomes time consuming and less efficient.  

We are happy to share with the R community one such unique map-reduce like approach we designed in R for a single node to process flights data (available here) which has  ~122 million records and occupies 12GB of space when uncompressed.  We used Mathew Dowle's data.table package heavily to load and analyze large datasets.

It took us few days to stabilize and optimize this approach and we are very proud to share this approach and source code with you.  The full source code can be found and downloaded from datadolph.in's git repository.

Here is how we approached this problem:  First, before loading the datasets in R, we compressed each of the 22 CSV files using gunzip for faster reading in R.  The method read.csv can read gzip files faster than it can read uncompressed files:

# load list of all files
 flights.files <- list.files(path=flights.folder.path, pattern="*.csv.gz")

# read files in data.table
 flights <- data.table(read.csv(flights.files[i], stringsAsFactors=F))

Next, we mapped the analysis we wanted to run to extract insights from each of the datasets.  This approach included extracting flight level, airlines level and airport level aggregated analysis and generating intermediate results.  Here is example code to get stats for each airline by year:

getFlightsStatusByAirlines <- function(flights, yr){   
  # by Year
  if(verbose) cat("Getting stats for airlines:", '\n')
  airlines.stats <- flights[, list(
                                   dep_airports=length(unique(origin)),
                                   flights=length(origin),
                                   flights_cancelled=sum(cancelled, na.rm=T),
                                   flights_diverted=sum(diverted, na.rm=T),
                                   flights_departed_late=length(which(depdelay > 0)),
                                   flights_arrived_late=length(which(arrdelay > 0)),
                                   total_dep_delay_in_mins=sum(depdelay[which(depdelay > 0)]),
                                   avg_dep_delay_in_mins=round(mean(depdelay[which(depdelay > 0)])),
                                   median_dep_delay_in_mins=round(median(depdelay[which(depdelay > 0)])),                 
                                   miles_traveled=sum(distance, na.rm=T)
                                 ), by=uniquecarrier][, year:=yr]
  #change col order
  setcolorder(airlines.stats, c("year", colnames(airlines.stats)[-ncol(airlines.stats)]))
  #save this data
  saveData(airlines.stats, paste(flights.folder.path, "stats/5/airlines_stats_", yr, ".csv", sep=""))
  #clear up space
  rm(airlines.stats)  
 # continue.. see git full code
}

Here is a copy of the map function:

#map all calculations 
mapFlightStats <- function(){
  for(j in 1:period) {
      yr <- as.integer(gsub("[^0-9]", "", gsub("(.*)(\\.csv)", "\\1", flights.files[j])))
      flights.data.file <- paste(flights.folder.path, flights.files[j], sep="")
      if(verbose) cat(yr, ": Reading : ", flights.data.file, "\n")
      flights <- data.table(read.csv(flights.data.file, stringsAsFactors=F))
      setkeyv(flights, c("year", "uniquecarrier", "dest", "origin", "month")) 
     # call functions
      getFlightStatsForYear(flights, yr)
      getFlightsStatusByAirlines(flights, yr)
      getFlightsStatsByAirport(flights, yr)
    }


As one can see, we are generating intermediate results by airlines (and by airports /  flights) for each year and storing it on the disk.  The map function takes less than 2 hours to run on a MacBook Pro which had 2.3 GHZ dual core processor and 8 GB of memory and generated 132 intermediate datasets containing aggregated analysis. 

And finally, we call the reduce function to aggregate intermediate datasets into final output (for flights, airlines and airports):

#reduce all results
reduceFlightStats <- function(){
  n <- 1:6
  folder.path <- paste("./raw-data/flights/stats/", n, "/", sep="")
  print(folder.path)
  for(i in n){
    filenames <- paste(folder.path[i], list.files(path=folder.path[i], pattern="*.csv"), sep="") 
    dt <- do.call("rbind", lapply(filenames, read.csv, stringsAsFactors=F))
    print(nrow(dt))
    saveData(dt, paste("./raw-data/flights/stats/", i, ".csv", sep=""))
  }
}