19 Aug 2014

Nutch: crawl URL from kafka


Apache Nutch is a java framework, used to build a distributed, multiprocessing parser and crawler, it uses map/reduce engine to distribute work across multi node for fast parsing and indexing web pages. 

Many people use Nutch will get familiar with this software by using its shell scripts (nutch and crawl script) which is shipped along with Nutch source code. 

When start using this project, I found it's difficult to integrate project's source code into our application, because we did not use anything like Spring framework with xml configuration before, other than that, Nutch comes with many hadoop's style configuration files, and these file has to be path of project in order to get application working. 

1. Follow steps in Apache Nutch's wiki   (install hbase, modify configuration, compile using ant). 
2. Add all compiled's library to Netbeans project.
3. Add modified configuration files as part of project.





4. And then work with Nutch source code.

The default Nutch's behaviour is taking list of URL from a special local folder, and passing it to Crawler class as Nutch.ARG_SEEDDIR which is somewhat not suitable for most of us, we often do not want to take a list of URLs, then create a file with these content manually and pass the directory into Nutch as seedDir argument. 

When looking into Nutch source code, it has another option, get a list URLs as argument and create a HDFS directory, it does not have to be a seedDir:
 String crawlId = (String) args.get(Nutch.ARG_CRAWL);
        if (crawlId != null) {
            getConf().set(Nutch.CRAWL_ID_KEY, crawlId);
        }
        String seedDir = null;
        String seedList = (String) args.get(Nutch.ARG_SEEDLIST);
        if (seedList != null) { // takes precedence
            String[] seeds = seedList.split("\\s+");
            // create tmp. dir
            String tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-"
                    + System.currentTimeMillis();
            FileSystem fs = FileSystem.get(getConf());
            Path p = new Path(tmpSeedDir);
            fs.mkdirs(p);
            Path seedOut = new Path(p, "urls"); 
           OutputStream os = fs.create(seedOut);
            for (String s : seeds) {
                os.write(s.getBytes());
                os.write('\n');
            }
            os.flush();
            os.close();
            cleanSeedDir = true;
            seedDir = tmpSeedDir;
        } else {
            seedDir = (String) args.get(Nutch.ARG_SEEDDIR);
        }

What we are going to do is simple, when starting our crawler, just give Nutch seedList as a Java String separated by " " instead of seedDir

String seedUrl = "www.google.com www.blogger.com";                
_logger.info("Processing url [{}]", seedUrl);                
Random random = new Random();                
String batchId = String.valueOf(random.nextInt());                
Configuration nutchConfiguration = NutchConfiguration.create();                
nutchConfiguration.set(BATCH_ID, batchId);                
String solrUrl = nutchConfiguration.get(SOLR_URL);                
String crawlArgs = String.format("-seedurl %s -depth 5 -topN 10", seedUrl);                
// Run Crawl tool                
ToolRunner.run(nutchConfiguration, new Crawler(),                        
            (crawlArgs));

It's not going to work. As previous source code showed above, seedList will take precedence (instead of seedDir), but next phase (InjectorJob) won't get seedList argument, it takes seedDir (which is a mistake?)


public Map<String,Object> run(Map<String,Object> args) throws Exception {    
    getConf().setLong("injector.current.time", System.currentTimeMillis());    
    Path input;    
    Object path = args.get(Nutch.ARG_SEEDDIR);    
    if (path instanceof Path) {      
        input = (Path)path;    
    } else {      
    input = new Path(path.toString()); 
       }    
....}


To get Nutch work with seedList, just put an seedDir argument into argument map in Crawler step:

 if (seedList != null) { // takes precedence
            String[] seeds = seedList.split("\\s+");
            // create tmp. dir
            String tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-"
                    + System.currentTimeMillis();
            FileSystem fs = FileSystem.get(getConf());
            Path p = new Path(tmpSeedDir);
            fs.mkdirs(p);
            Path seedOut = new Path(p, "urls"); 
           OutputStream os = fs.create(seedOut);
            for (String s : seeds) {
                os.write(s.getBytes());
                os.write('\n');
            }
            os.flush();
            os.close();
            cleanSeedDir = true;
            seedDir = tmpSeedDir;
            args.put(Nutch.ARG_SEEDDIR, seedDir);
        } else {
            seedDir = (String) args.get(Nutch.ARG_SEEDDIR);
        }

 and everything will be going well. 
Last step, get url from kafka, listen for a special kafka topic, get the message and pass it as seedList argument:

 public void process(String topic, ConsumerConnector consumer) {

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        while (it.hasNext()) {
            try {
                MessageAndMetadata<byte[], byte[]> next = it.next();
                String seedUrl = new String(next.message());
                _logger.info("Processing url [{}]", seedUrl);
                Random random = new Random();
                String batchId = String.valueOf(random.nextInt());
                Configuration nutchConfiguration = NutchConfiguration.create();
                nutchConfiguration.set(BATCH_ID, batchId);
                String solrUrl = nutchConfiguration.get(SOLR_URL);
                String crawlArgs = String.format("-seedurl %s -depth 5 -topN 10", seedUrl);
                // Run Crawl tool
                ToolRunner.run(nutchConfiguration, new Crawler(),
                        tokenize(crawlArgs));
                SolrIndexerJob solrIndexerJob = new SolrIndexerJob();
                solrIndexerJob.setConf(nutchConfiguration);
                solrIndexerJob.indexSolr(solrUrl, batchId);
            } catch (Exception ex) {
                java.util.logging.Logger.getLogger(ABCrawler.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
Happy crawling!

No comments:

Disqus