nutch开发(二)
开发环境 1.爬取后生成的目录结构
crawldblinkdbsegments 2.阅读TestCrawlDbMerger
createCrawlDb读取crawldb 3.关于索引的建立4.创建一个一步式的爬虫启动类
创建启动类关于如何配置solr服务器的位置 5.Crawler部分代码讲解
核心类核心函数
开发环境Linux,Ubuntu20.04LSTIDEANutch1.18Solr8.11
转载请声明出处!!!By 鸭梨的药丸哥
1.爬取后生成的目录结构 crawldbcrawldb目录下面存放下载的URL,以及下载的日期、过期时间
linkdblinkdb目录存放URL的关联关系,是下载完成后分析时创建的
segmentssegments目录存储抓取的页面,这些页面是根据层级关系分片的。
“crawl_generate” 生成要获取的一组URL的名字,既生成待下载的URL的集合
“crawl_fetch” 包含获取每个URL的状态
”content“ 包含从每个URL检索的原始内容
“parse_text” 包含每个URL的解析文本(存放每个解析过的URL的文本内容)
“parse_data” 包含从每个URL分析的外部链接和元数据
“crawl_parse” 包含用于更新crawldb的outlink URL(外部链接库)
这里给一张网络图片,这图片清晰滴描述了几个目录和爬取的关系
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1zgfSJou-1644982714814)(F:笔记文档笔记图片3.jpg)]
2.阅读TestCrawlDbMerger阅读test包下面的org.apache.nutch.crawl.TestCrawlDbMerger,是为了更好理解CrawlDb是什么东东。
createCrawlDb从createCrawlDb可以看出CrawlDb里面的part-r-00000文件夹里面的文件是Hadoop的SequenceFile格式数据,其中data和index组合成MapFile,MapFile是基于SequenceFile实现的。
SequenceFile 是 Hadoop 的一个重要数据文件类型,它提供key-value的存储,但与传统key-value存储(比如hash表,btree)不同的是,它是 appendonly的,于是你不能对已存在的key进行写操作。每一个key-value记录如下图,不仅保存了key,value值,也保存了他们的长度。MapFile – 一个key-value 对应的查找数据结构,由数据文件/data 和索引文件 /index 组成,数据文件中包含所有需要存储的key-value对,按key的顺序排列。索引文件包含一部分key值,用以指向数据文件的关键位置。
private void createCrawlDb(Configuration config, FileSystem fs, Path crawldb,
TreeSet init, CrawlDatum cd) throws Exception {
LOG.debug("* creating crawldb: " + crawldb);
Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
//可以看出文件夹里面的文件是SequenceFile格式
//其中整个文件夹是MapFile格式,index是索引文件,data是数据文件
Option wKeyOpt = MapFile.Writer.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
//生成MapFile
MapFile.Writer writer = new MapFile.Writer(config, new Path(dir,
"part-r-00000"), wKeyOpt, wValueOpt);
Iterator it = init.iterator();
while (it.hasNext()) {
String key = it.next();
//这里看出索引是以url进行索引的
writer.append(new Text(key), cd);
}
writer.close();
}
读取crawldb
参试读取crawlDb,前面说过crawldb里面的文件是SequenceFile文件格式的,所以我们用SequenceFile.Reader读取data文件,前面说过这个文件夹是MapFile数据格式,由数据文件/data 和索引文件 /index 组成。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import java.io.IOException;
public class TestCrawlDbReader {
@Test
public void readDbTest() throws IOException {
Configuration conf=NutchConfiguration.create();
Path dataPath=new Path("/home/liangwy/IdeaProjects/apache-nutch-1.18/myNutch/crawldb/current/part-r-00000/data");
FileSystem fs=dataPath.getFileSystem(conf);
SequenceFile.Reader reader=new SequenceFile.Reader(fs,dataPath,conf);
Text key=new Text();
CrawlDatum value=new CrawlDatum();
while(reader.next(key,value)){
System.out.println("key:"+key);
System.out.println("value:"+value);
}
reader.close();
}
}
结果
key:http://nutch.apache.org/ value:Version: 7 Status: 5 (db_redir_perm) Fetch time: Sun Feb 13 20:29:40 CST 2022 Modified time: Fri Jan 14 20:29:40 CST 2022 Retries since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0 Signature: null metadata: _pst_=moved(12), lastModified=0: https://nutch.apache.org/ _rs_=339 Content-Type=application/octet-stream nutch.protocol.code=3013.关于索引的建立
Nutch把全文检索功能独立出去后,已经摇身一变成网络爬虫了,主要是爬取功能,关于index的功能已经交个其他全文检索服务器实现了,如solr等。
其中,Nutch1.18使用IndexingJob这个启动类去实现索引功能,而IndexingJob背后正在实现去服务器建立索引的实现是各种插件,如indexer-solr插件,所以Nutch才能支持多种全文检索服务器。
索引的建立可以通过以下代码实现,至于具体的全部代码我后面会给出,并将代码发到github上面。
// index FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs)); IndexingJob indexer = new IndexingJob(getConf()); indexer.index(crawlDb, linkDb, Arrays.asList(HadoopFSUtil.getPaths(fstats)),false);4.创建一个一步式的爬虫启动类 创建启动类
下面的Crawler可以实现一整个爬取流程,并在solr服务器上面建立索引。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.fetcher.Fetcher;
import org.apache.nutch.indexer.CleaningJob;
import org.apache.nutch.indexer.IndexingJob;
import org.apache.nutch.parse.ParseSegment;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.crypto.Data;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
public class Crawler extends NutchTool implements Tool {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
private Configuration conf;
@Override
public Configuration getConf() {
if (conf == null){
conf = NutchConfiguration.create();
}
return conf;
}
public String getDate(){
return new SimpleDateFormat("yyyyMMddHHmmss").format
(new Date(System.currentTimeMillis()));
}
@Override
public int run(String[] strings) throws Exception {
Path rootUrlDir = new Path("/home/liangwy/IdeaProjects/apache-nutch-1.18/urls");
Path dir = new Path("/home/liangwy/IdeaProjects/apache-nutch-1.18","crawl-" + getDate());
int threads = 50;
int depth = 2;
long topN = 10;
JobConf job = new JobConf(getConf());
FileSystem fs = FileSystem.get(job);
if (LOG.isInfoEnabled()) {
LOG.info("crawl started in: " + dir);
LOG.info("rootUrlDir = " + rootUrlDir);
LOG.info("threads = " + threads);
LOG.info("depth = " + depth);
if (topN != Long.MAX_VALUE)
LOG.info("topN = " + topN);
}
Path crawlDb = new Path(dir + "/crawldb");
Path linkDb = new Path(dir + "/linkdb");
Path segments = new Path(dir + "/segments");
Injector injector = new Injector(getConf());
Generator generator = new Generator(getConf());
Fetcher fetcher = new Fetcher(getConf());
ParseSegment parseSegment = new ParseSegment(getConf());
CrawlDb crawlDbTool = new CrawlDb(getConf());
linkDb linkDbTool = new linkDb(getConf());
// initialize crawlDb
injector.inject(crawlDb, rootUrlDir);
//爬取次数
int i;
for (i = 0; i < depth; i++) {
// generate new segment
Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
.currentTimeMillis());
if (segs == null) {
LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
break;
}
fetcher.fetch(segs[0], threads); // fetch it
if (!Fetcher.isParsing(job)) {
parseSegment.parse(segs[0]); // parse it, if needed
}
crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
}
//去重
DeduplicationJob dedup = new DeduplicationJob();
dedup.setConf(getConf());
//脚本参数
String[] dedupArgs = new String[]{crawlDb.toString()};
//貌似没有封装过的去重方法,这里就调用run函数了
dedup.run(dedupArgs);
// invert links
if (i > 0) {
linkDbTool.invert(linkDb, segments, true, true, false); // invert links
// index
FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));
IndexingJob indexer = new IndexingJob(getConf());
indexer.index(crawlDb, linkDb,
Arrays.asList(HadoopFSUtil.getPaths(fstats)),false);
//clean
CleaningJob cleaningJob = new CleaningJob();
cleaningJob.setConf(getConf());
cleaningJob.delete(crawlDb.toString(),false);
} else {
LOG.warn("No URLs to fetch - check your seed list and URL filters.");
}
if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); }
return 0;
}
@Override
public Map run(Map args, String crawlId) throws Exception {
return null;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new Crawler(), args);
System.exit(res);
}
}
关于如何配置solr服务器的位置
在conf/目录下面,有一个index-writers.xml的配置文件,找到indexer_solr_1,里面有配置slor服务器的位置和一些索引建立时的配置信息。
5.Crawler部分代码讲解 核心类
Injector injector = new Injector(getConf()); //inject功能 Generator generator = new Generator(getConf()); //generate功能 Fetcher fetcher = new Fetcher(getConf()); //网页fetch ParseSegment parseSegment = new ParseSegment(getConf()); //parse CrawlDb crawlDbTool = new CrawlDb(getConf()); //update linkDb linkDbTool = new linkDb(getConf()); //Invert links DeduplicationJob dedup = new DeduplicationJob(); // DeleteDuplicates IndexingJob indexer = new IndexingJob(getConf()); // Index IndexMerger CleaningJob cleaningJob = new CleaningJob(); // 去除401,302的index核心函数
//注入urls injector.inject(crawlDb, rootUrlDir); //创建新的segments,在新的segments里面创建抓取列表crawl_generate generator.generate(crawlDb, segments, -1, topN, System.currentTimeMillis()); //给最新的segments进行网页的抓取,生成"crawl_fetch","content" fetcher.fetch(segs[0], threads); //解析最新的segments的爬取内容,生成“parse_text” “parse_data” “crawl_parse” parseSegment.parse(segs[0]); //跟新crawldb crawlDbTool.update(crawlDb, segs, true, true); //去重,NUTCH1.8之后去重工作放在了index之前 dedup.run(dedupArgs); //链接反转,建立索引前的一步 linkDbTool.invert(linkDb, segments, true, true, false); //添加索引,去哪个索引服务器添加跟你配置的index-writers.xml有关 indexer.index(crawlDb, linkDb,Arrays.asList(HadoopFSUtil.getPaths(fstats)),false); //清除302,401等网页,这里的清除操作是添加索引之后 cleaningJob.delete(crawlDb.toString(),false);



