图片分割线----------------------------------------------------------------------------------------------------------------------
图片分割线----------------------------------------------------------------------------------------------------------------------
图片分割线----------------------------------------------------------------------------------------------------------------------
打开启用编制索引、启用复制
对Hbase的表进行改造:
对于初次建立的表:
# 其中1表示开启replication功能,0表示不开启,默认为0
Create ‘device_safe_center_notice’,{NAME=>’alert_data’,VERSIONS=>3,REPLICATION_SCOPE=>1}
对于已经存在的表:
disable ‘device_safe_center_notice’
alter ‘device_safe_center_notice’,{NAME=>’alert_data’,REPLICATION_SCOPE=>1}
enable ‘device_safe_center_notice’
2、创建相应的 SolrCloud 集合
[root@node2 bin]# solrctl instancedir --generate /opt/module/hbase-indexer/solr_test [root@node2 conf]# pwd /opt/module/hbase-indexer/solr_test/conf [root@node2 conf]# vim managed-schema 添加: # name格式为”表名_列族名_列名”
打开硬提交:
[root@node2 conf]# vim solrconfig.xml${solr.autoCommit.maxTime:10000} false ${solr.autoSoftCommit.maxTime:1000}
执行:
[root@node2 conf]# solrctl instancedir --create solr_test /opt/module/hbase-indexer/solr_test/
# 以下的collection名称很重要,后续代码中要用到 [root@node2 conf]# solrctl collection --create solr_test
在/opt/module/hbase-indexer/solr_test路径下创建morphline-hbase-mapper.xml文件:
[root@node2 solr_test]# pwd /opt/module/hbase-indexer/solr_test [root@node2 solr_test]# vim morphline-hbase-mapper.xml
执行:
[root@node2 solr_test]# hbase-indexer add-indexer -n solr_test_indexer -c /opt/module/hbase-indexer/solr_test/morphline-hbase-mapper.xml -cp solr.zk=node1,node2,node3:2181/solr -cp solr.collection=solr_test
至此,hbase新增数据已经同步添加solr索引,但是在添加solr索引之前就已经存在于hbase中的数据需要手动批量添加索引
3、批量添加索引[root@node2 jars]# hadoop jar /opt/cloudera/parcels/CDH/jars/hbase-indexer-mr-1.5-cdh6.3.2-job.jar --hbase-indexer-zk node1,node2,node3:2181 --hbase-indexer-name solr_test_indexer --reducers 0三、基于solr二级索引查询hbase 1、SolrQueryUtil.java
添加pom依赖:
org.apache.solr
solr-solrj
7.4.0
org.apache.solr
solr-core
7.4.0
代码:
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.Solrdocument;
import java.util.Collections;
import java.util.Optional;
public class SolrQueryUtil {
public static void main(String[] args) {
try {
long start_time = System.currentTimeMillis();
CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList("192.168.18.211:2181,192.168.18.212:2181,192.168.18.213:2181"), Optional.of("/solr")).build();
long middle_time = System.currentTimeMillis();
// 查询语句
SolrQuery query = new SolrQuery("device_alert_notice_alert_data_dest_ip:163.402.777.809");
// 查询结果需要显示的行数
query.setRows(20);
// 其中“solr_test”为创建的二级索引的collection名
QueryResponse response = cloudSolrClient.query("solr_test", query);
long stop_time = System.currentTimeMillis();
System.out.println("======== connect solr total use time : " + (stop_time - start_time));
System.out.println("======== solr query use time : " + (stop_time - middle_time));
for (Solrdocument result : response.getResults()) {
// SolrQuery()内需要传入的参数格式为:hbase表名_列族名_列名:列值
System.out.println(result.get("id"));
System.out.println(PhoenixGetDataUtil.get((String) result.get("id")));
}
System.out.println(response.getResults().getNumFound());
System.out.println(response.toString());
cloudSolrClient.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、PhoenixGetDataUtil.java
添加pom依赖:
org.apache.phoenix phoenix-core 5.0.0-Hbase-2.0 org.glassfish javax.el org.apache.hadoop hadoop-common org.glassfish javax.el 3.0.1-b06 org.apache.hadoop hadoop-common 3.0.0
代码:
import java.sql.*;
import java.util.Properties;
public class PhoenixGetDataUtil {
public static String getData(String id) {
try {
String driver = "org.apache.phoenix.jdbc.PhoenixDriver";
Class.forName(driver);
String url = "jdbc:phoenix:192.168.18.211,192.168.18.212,192.168.18.213:2181";
//需保证客户端和服务端参数配置一致
Properties props = new Properties();
props.put("phoenix.schema.isNamespaceMappingEnabled", "true");
props.setProperty("phoenix.query.timeoutMs", "1200000");
props.setProperty("hbase.rpc.timeout", "1000");
props.setProperty("hbase.client.scanner.timeout.period", "1200000");
long start_time = System.currentTimeMillis();
Connection connection = DriverManager.getConnection(url);
long middle_time = System.currentTimeMillis();
//查询数据
PreparedStatement pstste = connection.prepareStatement("select "src_ip","dest_ip" from "device_alert_notice" where "ID" = ?");
pstste.setString(1,id);
ResultSet resultSet = pstste.executeQuery();
long stop_time = System.currentTimeMillis();
System.out.println("============== connect hbase total use time :" + (stop_time - start_time));
System.out.println("============== hbase query use time :" + (stop_time - middle_time));
String result = null;
while (resultSet.next()){
result = "src_ip : "+resultSet.getString(1)+" dest_ip : "+resultSet.getString(2);
}
pstste.close();
connection.close();
return result;
} catch (Exception e){
e.printStackTrace();
return null;
}
}
}



