栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

nutch开发(二)

nutch开发(二)

nutch开发(二)

文章目录

nutch开发(二)

开发环境 1.爬取后生成的目录结构

crawldblinkdbsegments 2.阅读TestCrawlDbMerger

createCrawlDb读取crawldb 3.关于索引的建立4.创建一个一步式的爬虫启动类

创建启动类关于如何配置solr服务器的位置 5.Crawler部分代码讲解

核心类核心函数

开发环境

Linux,Ubuntu20.04LSTIDEANutch1.18Solr8.11

转载请声明出处!!!By 鸭梨的药丸哥

1.爬取后生成的目录结构 crawldb

crawldb目录下面存放下载的URL,以及下载的日期、过期时间

linkdb

linkdb目录存放URL的关联关系,是下载完成后分析时创建的

segments

segments目录存储抓取的页面,这些页面是根据层级关系分片的。

“crawl_generate” 生成要获取的一组URL的名字,既生成待下载的URL的集合

“crawl_fetch” 包含获取每个URL的状态

”content“ 包含从每个URL检索的原始内容

“parse_text” 包含每个URL的解析文本(存放每个解析过的URL的文本内容)

“parse_data” 包含从每个URL分析的外部链接和元数据

“crawl_parse” 包含用于更新crawldb的outlink URL(外部链接库)

这里给一张网络图片,这图片清晰滴描述了几个目录和爬取的关系

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1zgfSJou-1644982714814)(F:笔记文档笔记图片3.jpg)]

2.阅读TestCrawlDbMerger

阅读test包下面的org.apache.nutch.crawl.TestCrawlDbMerger,是为了更好理解CrawlDb是什么东东。

createCrawlDb

从createCrawlDb可以看出CrawlDb里面的part-r-00000文件夹里面的文件是Hadoop的SequenceFile格式数据,其中data和index组合成MapFile,MapFile是基于SequenceFile实现的。

SequenceFile 是 Hadoop 的一个重要数据文件类型,它提供key-value的存储,但与传统key-value存储(比如hash表,btree)不同的是,它是 appendonly的,于是你不能对已存在的key进行写操作。每一个key-value记录如下图,不仅保存了key,value值,也保存了他们的长度。MapFile – 一个key-value 对应的查找数据结构,由数据文件/data 和索引文件 /index 组成,数据文件中包含所有需要存储的key-value对,按key的顺序排列。索引文件包含一部分key值,用以指向数据文件的关键位置。

private void createCrawlDb(Configuration config, FileSystem fs, Path crawldb,
      TreeSet init, CrawlDatum cd) throws Exception {
    LOG.debug("* creating crawldb: " + crawldb);
    Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
    
    //可以看出文件夹里面的文件是SequenceFile格式
    //其中整个文件夹是MapFile格式,index是索引文件,data是数据文件
    Option wKeyOpt = MapFile.Writer.keyClass(Text.class);
    org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
    
    //生成MapFile
    MapFile.Writer writer = new MapFile.Writer(config, new Path(dir,
        "part-r-00000"), wKeyOpt, wValueOpt);
    Iterator it = init.iterator();
    while (it.hasNext()) {
      String key = it.next();
        //这里看出索引是以url进行索引的
      writer.append(new Text(key), cd);
    }
    writer.close();
  }
读取crawldb

参试读取crawlDb,前面说过crawldb里面的文件是SequenceFile文件格式的,所以我们用SequenceFile.Reader读取data文件,前面说过这个文件夹是MapFile数据格式,由数据文件/data 和索引文件 /index 组成。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import java.io.IOException;


public class TestCrawlDbReader {
    
    @Test
    public void readDbTest() throws IOException {
        Configuration conf=NutchConfiguration.create();
        Path dataPath=new Path("/home/liangwy/IdeaProjects/apache-nutch-1.18/myNutch/crawldb/current/part-r-00000/data");
        FileSystem fs=dataPath.getFileSystem(conf);
        SequenceFile.Reader reader=new SequenceFile.Reader(fs,dataPath,conf);
        Text key=new Text();
        CrawlDatum value=new CrawlDatum();
        while(reader.next(key,value)){
            System.out.println("key:"+key);
            System.out.println("value:"+value);
        }
        reader.close();
    }
}

结果

key:http://nutch.apache.org/
value:Version: 7
Status: 5 (db_redir_perm)
Fetch time: Sun Feb 13 20:29:40 CST 2022
Modified time: Fri Jan 14 20:29:40 CST 2022
Retries since fetch: 0
Retry interval: 2592000 seconds (30 days)
Score: 1.0
Signature: null
metadata: 
 	_pst_=moved(12), lastModified=0: https://nutch.apache.org/
	_rs_=339
	Content-Type=application/octet-stream
	nutch.protocol.code=301
3.关于索引的建立

Nutch把全文检索功能独立出去后,已经摇身一变成网络爬虫了,主要是爬取功能,关于index的功能已经交个其他全文检索服务器实现了,如solr等。

其中,Nutch1.18使用IndexingJob这个启动类去实现索引功能,而IndexingJob背后正在实现去服务器建立索引的实现是各种插件,如indexer-solr插件,所以Nutch才能支持多种全文检索服务器。

索引的建立可以通过以下代码实现,至于具体的全部代码我后面会给出,并将代码发到github上面。

// index
FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));
IndexingJob indexer = new IndexingJob(getConf());
indexer.index(crawlDb, linkDb,
Arrays.asList(HadoopFSUtil.getPaths(fstats)),false);
4.创建一个一步式的爬虫启动类 创建启动类

下面的Crawler可以实现一整个爬取流程,并在solr服务器上面建立索引。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.fetcher.Fetcher;
import org.apache.nutch.indexer.CleaningJob;
import org.apache.nutch.indexer.IndexingJob;
import org.apache.nutch.parse.ParseSegment;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.crypto.Data;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;


public class Crawler extends NutchTool implements Tool {
    private static final Logger LOG = LoggerFactory
            .getLogger(MethodHandles.lookup().lookupClass());

    private Configuration conf;

    @Override
    public Configuration getConf() {
        if (conf == null){
            conf = NutchConfiguration.create();
        }
        return conf;
    }

    public String getDate(){
        return new SimpleDateFormat("yyyyMMddHHmmss").format
                (new Date(System.currentTimeMillis()));
    }

    
    @Override
    public int run(String[] strings) throws Exception {
        
        Path rootUrlDir = new Path("/home/liangwy/IdeaProjects/apache-nutch-1.18/urls");
        
        Path dir = new Path("/home/liangwy/IdeaProjects/apache-nutch-1.18","crawl-" + getDate());
        
        int threads = 50;
        
        int depth = 2;
        
        long topN = 10;

        JobConf job = new JobConf(getConf());
        FileSystem fs = FileSystem.get(job);

        if (LOG.isInfoEnabled()) {
            LOG.info("crawl started in: " + dir);
            LOG.info("rootUrlDir = " + rootUrlDir);
            LOG.info("threads = " + threads);
            LOG.info("depth = " + depth);
            if (topN != Long.MAX_VALUE)
                LOG.info("topN = " + topN);
        }

        
        Path crawlDb = new Path(dir + "/crawldb");
        Path linkDb = new Path(dir + "/linkdb");
        Path segments = new Path(dir + "/segments");

        
        Injector injector = new Injector(getConf());
        Generator generator = new Generator(getConf());
        Fetcher fetcher = new Fetcher(getConf());
        ParseSegment parseSegment = new ParseSegment(getConf());
        CrawlDb crawlDbTool = new CrawlDb(getConf());
        linkDb linkDbTool = new linkDb(getConf());

        // initialize crawlDb
        injector.inject(crawlDb, rootUrlDir);

        //爬取次数
        int i;
        for (i = 0; i < depth; i++) {
            // generate new segment
            Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
                    .currentTimeMillis());
            if (segs == null) {
                LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
                break;
            }

            fetcher.fetch(segs[0], threads);  // fetch it
            if (!Fetcher.isParsing(job)) {
                parseSegment.parse(segs[0]);    // parse it, if needed
            }
            crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
        }
        
        //去重
        DeduplicationJob dedup = new DeduplicationJob();
        dedup.setConf(getConf());
        //脚本参数
        String[] dedupArgs = new String[]{crawlDb.toString()};
        //貌似没有封装过的去重方法,这里就调用run函数了
        dedup.run(dedupArgs);
        
        // invert links
        if (i > 0) {
            linkDbTool.invert(linkDb, segments, true, true, false); // invert links

            // index
            FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));
            IndexingJob indexer = new IndexingJob(getConf());
            indexer.index(crawlDb, linkDb,
                    Arrays.asList(HadoopFSUtil.getPaths(fstats)),false);

            //clean
            CleaningJob cleaningJob = new CleaningJob();
            cleaningJob.setConf(getConf());
            cleaningJob.delete(crawlDb.toString(),false);

        } else {
            LOG.warn("No URLs to fetch - check your seed list and URL filters.");
        }

        if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); }

        return 0;
    }

    
    @Override
    public Map run(Map args, String crawlId) throws Exception {
        return null;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(NutchConfiguration.create(), new Crawler(), args);
        System.exit(res);
    }

}
关于如何配置solr服务器的位置

在conf/目录下面,有一个index-writers.xml的配置文件,找到indexer_solr_1,里面有配置slor服务器的位置和一些索引建立时的配置信息。

  
    
      
      
      
      
      
      
      
      
    
    
      
        
        
      
      
        
        
      
      
        
      
    
  
5.Crawler部分代码讲解 核心类
Injector injector = new Injector(getConf());  //inject功能
Generator generator = new Generator(getConf());   //generate功能
Fetcher fetcher = new Fetcher(getConf());  //网页fetch
ParseSegment parseSegment = new ParseSegment(getConf());  //parse
CrawlDb crawlDbTool = new CrawlDb(getConf());   //update
linkDb linkDbTool = new linkDb(getConf());   //Invert links

DeduplicationJob dedup = new DeduplicationJob();  // DeleteDuplicates
IndexingJob indexer = new IndexingJob(getConf());  // Index IndexMerger
CleaningJob cleaningJob = new CleaningJob();  // 去除401,302的index
核心函数
//注入urls
injector.inject(crawlDb, rootUrlDir);  

//创建新的segments,在新的segments里面创建抓取列表crawl_generate
generator.generate(crawlDb, segments, -1, topN, System.currentTimeMillis()); 

//给最新的segments进行网页的抓取,生成"crawl_fetch","content"
fetcher.fetch(segs[0], threads);

//解析最新的segments的爬取内容,生成“parse_text” “parse_data” “crawl_parse”
parseSegment.parse(segs[0]);

//跟新crawldb
crawlDbTool.update(crawlDb, segs, true, true);

//去重,NUTCH1.8之后去重工作放在了index之前
dedup.run(dedupArgs);

//链接反转,建立索引前的一步
linkDbTool.invert(linkDb, segments, true, true, false);

//添加索引,去哪个索引服务器添加跟你配置的index-writers.xml有关
indexer.index(crawlDb, linkDb,Arrays.asList(HadoopFSUtil.getPaths(fstats)),false);

//清除302,401等网页,这里的清除操作是添加索引之后
cleaningJob.delete(crawlDb.toString(),false);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742652.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号