部署版本: elasticsearch-6.5.3
准备三台服务器 19.168.174.204,192.168.174.205,192.168.174.206
第一步 : 每台服务器 都创建一个es 用户, elasticsearch不支持root 启动
useradd es
第二步: 在root 用户下 ,修改es 用户文件打开数
ulimit -Hn 查看硬限制
vi /etc/security/limits.conf
es soft nofile 65536 es hard nofile 65536
注意一定要重启虚拟机,使其生效
修改 vi /etc/security/limits.d/20-nproc.conf ~> 文件名以 -nproc.conf结尾
soft nproc 1024 修改为 soft nproc 2048
修改 vi /etc/sysctl.conf ~>在root用户下进行修改
添加下面配置: vm.max_map_count=655360 并执行命令: sysctl -p
第三步:上传tar 包
elasticsearch-6.5.3.tar.gz tar -zxvf elasticsearch-6.5.3.tar.gz mv elasticsearch-6.5.3 elasticsearch
第四步:修改配置文件 elasticsearch.yml
# ======================== Elasticsearch Configuration ========================= # # NOTE: Elasticsearch comes with reasonable defaults for most settings. # Before you set out to tweak and tune the configuration, make sure you # understand what are you trying to accomplish and the consequences. # # The primary way of configuring a node is via this file. This template lists # the most important settings you may want to configure for a production cluster. # # Please consult the documentation for further information on configuration options: # https://www.elastic.co/guide/en/elasticsearch/reference/index.html # # ---------------------------------- Cluster ----------------------------------- # # Use a descriptive name for your cluster: # cluster.name: bigdata # # ------------------------------------ Node ------------------------------------ # # Use a descriptive name for the node: # node.name: es-1 # # Add custom attributes to the node: # #node.attr.rack: r1 # # ----------------------------------- Paths ------------------------------------ # # Path to directory where to store the data (separate multiple locations by comma): # path.data: /opt/soft/elasticsearch/data # # Path to log files: # path.logs: /opt/soft/elasticsearch/logs # # ----------------------------------- Memory ----------------------------------- # # Lock the memory on startup: # #bootstrap.memory_lock: true # # Make sure that the heap size is set to about half the memory available # on the system and that the owner of the process is allowed to use this # limit. # # Elasticsearch performs poorly when the system is swapping the memory. # # ---------------------------------- Network ----------------------------------- # # Set the bind address to a specific IP (IPv4 or IPv6): # network.host: hdp0 node.master: true node.data: true # # Set a custom port for HTTP: # #http.port: 9200 # # For more information, consult the network module documentation. # # --------------------------------- Discovery ---------------------------------- # # Pass an initial list of hosts to perform discovery when new node is started: # The default list of hosts is ["127.0.0.1", "[::1]"] # discovery.zen.ping.unicast.hosts: ["hdp0", "hdp1","hdp2"] # # Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1): # discovery.zen.minimum_master_nodes: 2 bootstrap.memory_lock: false http.cors.enabled: true http.cors.allow-origin: "*" # For more information, consult the zen discovery module documentation. # # ---------------------------------- Gateway ----------------------------------- # # Block initial recovery after a full cluster restart until N nodes are started: # #gateway.recover_after_nodes: 3 # # For more information, consult the gateway module documentation. # # ---------------------------------- Various ----------------------------------- # # Require explicit names when deleting indices: # #action.destructive_requires_name: true
分发到其他服务器
scp -r elasticsearch hdp1:$PWD scp -r elasticsearch hdp2:$PWD
三台 root 赋权 否则会报权限不够 es 无法启动
chmod 777 -R elasticsearch
修改其他的node
hdp1服务器配置文件修改: node.name: es-2 network.host: hdp1 hdp2服务器配置文件修改: node.name: es-3 network.host: hdp2
切换到用es
./elasticsearch -d
web 测试
由于 上面的web 显示是一个JSON页面,实在不美观.那就使用 elasticsearch-head.运行elasticsearch-head会用到grunt,而grunt需要npm包管理器,所以nodejs是必须要安装的。选择一台服务器部署
第一步:下载node-v12.16.1-linux-x64.tar.xz
tar -xvf node-v12.16.1-linux-x64.tar.xz ##解压 ln -s /opt/soft/node-v12.16.1-linux64/bin/node /usr/bin/node ## 创建软连接 ln -s /opt/soft/node-v12.16.1-linux64/bin/npm /usr/bin/npm ## 创建软连接 ##查看版本信息 node -v npm -v
##这里一定配置下淘宝镜像,这样很快 npm config set registry http://registry.cnpmjs.org ##注册官网镜像(使用淘宝较快) npm config set registry https://registry.npm.taobao.org ##注册淘宝镜像 npm install -g grunt-cli
下载phontomjs
##下载 https://bitbucket.org/ariya/phantomjs/downloads/phantomjs-2.1.1-linux-x86_64.tar.bz2 ##安装解压的工具 yum install -y bzip2 ##安装Git yum install -y git ##使用git下载head插件: cd /usr/local git clone git://github.com/mobz/elasticsearch-head.git ## 安装 elasticsearch-head 依赖的包 cd ./elasticsearch-head rm -rf ./node_modules ##重新安装时一定要删除 npm install --unsafe-perm
修改配置文件
添加hostname
1、修改 Gruntfile.js 在connect-->server-->options下面添加:hostname:'*' ,允许所有IP可
以访问
[root@hadoop01 elasticsearch-head]# vi ./Gruntfile.js
修改如下:
connect: {
server: {
options: {
hostname: '*', ###新增
port: 9100,
base: '.',
keepalive: true
}
}
}
2、修改默认连接
[root@hadoop01 elasticsearch-head]# vi ./_site/app.js
修改如下:
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") ||
"http://hdp0:9200"; #localhost修改成hdp0
启动
1、先使用es用户启动es集群 2、使用root用户启动head插件 ./node_modules/grunt/bin/grunt server #前端 启动 nohup ./node_modules/grunt/bin/grunt server> /var/log/head.log 2>&1 & #前端启动
测试进程
ps -ef | grep server
使用web浏览器访问:http://hdp0:9100
scala程序编写,连接es 尝试添加数据
pom.xml
6.5.3
org.elasticsearch
elasticsearch
${es.version}
${scope}
org.elasticsearch.client
transport
${es.version}
${scope}
io.netty
netty-all
4.1.25.Final
es-config.json
{
"config": {
"cluster.name": "bigdata",
"client.transport.sniff": true
},
"address": [
{
"ip": "hdp0",
"port": 9300
},
{
"ip": "hdp1",
"port": 9300
},
{
"ip": "hdp2",
"port": 9300
}
]
}
ESConfigUtil.scala 读取配置文件
package com.qianfeng.bigdata.realtime.flink.util.es
import java.net.{InetAddress, InetSocketAddress}
import com.qianfeng.bigdata.realtime.flink.constant.QRealTimeConstant
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.http.HttpHost
object ESConfigUtil {
//定义两个变量
var esConfigSocket: ESConfigSocket = null
var esConfigHttpHost : ESConfigHttpHost = null
//定义封装socket的配置类
class ESConfigSocket(var config: java.util.HashMap[String, String], var transportAddresses: java.util.ArrayList[InetSocketAddress])
//定义封装http的配置类
class ESConfigHttpHost(var config: java.util.HashMap[String, String], var transportAddresses: java.util.ArrayList[HttpHost])
def getConfigSocket(configPath: String): ESConfigSocket = {
//使用类加载器加载配置文件
val configStream = this.getClass.getClassLoader.getResourceAsStream(configPath)
if (null == esConfigSocket) {
val mapper = new ObjectMapper()
val configJsonObject = mapper.readTree(configStream)
val configJsonNode = configJsonObject.get("config")
val config = {
val configJsonMap = new java.util.HashMap[String, String]
val it = configJsonNode.fieldNames()
while (it.hasNext) {
val key = it.next()
configJsonMap.put(key, configJsonNode.get(key).asText())
}
configJsonMap
}
val addressJsonNode = configJsonObject.get("address")
val addressJsonArray = classOf[ArrayNode].cast(addressJsonNode)
val transportAddresses = {
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
val it = addressJsonArray.iterator()
while (it.hasNext) {
val detailJsonNode: JsonNode = it.next()
val ip = detailJsonNode.get("ip").asText()
val port = detailJsonNode.get("port").asInt()
transportAddresses.add(new InetSocketAddress(InetAddress.getByName(ip), port))
}
transportAddresses
}
esConfigSocket = new ESConfigSocket(config, transportAddresses)
}
esConfigSocket
}
def getConfigHttpHost(configPath: String): ESConfigHttpHost = {
val configStream = this.getClass.getClassLoader.getResourceAsStream(configPath)
if (null == esConfigHttpHost) {
val mapper = new ObjectMapper()
val configJsonObject = mapper.readTree(configStream)
val configJsonNode = configJsonObject.get("config")
val config = {
val configJsonMap = new java.util.HashMap[String, String]
val it = configJsonNode.fieldNames()
while (it.hasNext) {
val key = it.next()
configJsonMap.put(key, configJsonNode.get(key).asText())
}
configJsonMap
}
val addressJsonNode = configJsonObject.get("address")
val addressJsonArray = classOf[ArrayNode].cast(addressJsonNode)
val transportAddresses = {
val httpHosts = new java.util.ArrayList[HttpHost]
val it = addressJsonArray.iterator()
while (it.hasNext) {
val detailJsonNode: JsonNode = it.next()
val ip = detailJsonNode.get("ip").asText()
val port = detailJsonNode.get("port").asInt()
val schema = "http"
val httpHost = new HttpHost(ip, port, schema)
httpHosts.add(httpHost)
}
httpHosts
}
esConfigHttpHost = new ESConfigHttpHost(config, transportAddresses)
}
esConfigHttpHost
}
//测试
def main(args: Array[String]): Unit = {
println(getConfigSocket(QRealTimeConstant.ES_CONFIG_PATH).config.get("cluster.name"))
println(getConfigHttpHost(QRealTimeConstant.ES_CONFIG_PATH).transportAddresses.get(0).getHostName)
}
}
连接es的客户端
ES6ClientUtil
package com.qianfeng.bigdata.realtime.flink.util.es
import java.net.InetSocketAddress
import com.qianfeng.bigdata.realtime.flink.constant.QRealTimeConstant
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
import org.slf4j.{Logger, LoggerFactory}
import java.util
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.update.UpdateResponse
import scala.collection.JavaConverters._
import scala.collection.mutable
object ES6ClientUtil {
private val logger: Logger = LoggerFactory.getLogger(ES6ClientUtil.getClass)
def buildTransportClient(esConfigPath:String = QRealTimeConstant.ES_CONFIG_PATH):PreBuiltTransportClient={
//判断esconfigpath 是否为空
if(esConfigPath == null){ throw new RuntimeException("esconfigpth is null ")}
var transportClient:PreBuiltTransportClient =null
val esConfig: ESConfigUtil.ESConfigSocket = ESConfigUtil.getConfigSocket(esConfigPath)
val transAddrs: mutable.Buffer[InetSocketAddress] = esConfig.transportAddresses.asScala
val settings: Settings.Builder = Settings.builder()
for ((key,value) <- esConfig.config.asScala){
settings.put(key,value)
}
transportClient = new PreBuiltTransportClient(settings.build())
for (transAddr <- transAddrs) {
val address = new TransportAddress(transAddr)
transportClient.addTransportAddress(address)
}
transportClient
}
def main(args: Array[String]): Unit = {
val transportClient: PreBuiltTransportClient = buildTransportClient()
val indexName ="user"
val esID="007"
var value = new util.HashMap[String,String]()
value.put("name","daqu")
value.put("age","301")
val indexRequest: IndexRequest = new IndexRequest(indexName,"book",esID).source(value)
val response: UpdateResponse = transportClient.prepareUpdate(indexName,"book",esID).setDoc(value).setUpsert(indexRequest).get()
println(response.status().getStatus)
}
}



