(架构图源于参考书籍)
Elasticsearch官方简介:Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。
# 下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.1-linux-x86_64.tar.gz # 解压 tar xzvf elasticsearch-7.15.1-linux-x86_64.tar.gz # 以非 root 用户启动 cd /elasticsearch-7.15.1/bin/ ./elasticsearch
# 检验是否启动成功,172.16.16.4 为 elasticsearch.yml 配置绑定的 IP 地址 curl 172.16.16.4:9200
若无法正常启动,则修改配置:
/home/sam/elasticsearch-7.15.1/config 修改 jvm.options 中内存配置: -Xms256m -Xmx256m 修改 vim elasticsearch.yml : cluster.name: my-application node.name: node-1 network.host: 172.16.16.4 http.port: 9200 discovery.seed_hosts: ["172.26.26.4", "::1"] cluster.initial_master_nodes: ["node-1"]创建映射
创建 metadata 索引以及 objects 类型的映射:
curl -H "Content-Type: application/json" -XPUT 172.16.16.4:9200/metadata?include_type_name=true -d'{"mappings":{"objects":{"properties":{"name":{"type":"text","index":"false"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"text"}}}}}'
ES包封装
该 ES 包封装了以 HTTP 访问 ES 的各种 API 的操作。
package es
import (
"demo/sys"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
)
type metadata struct {
Name string
Version int
Size int64
Hash string
}
type hit struct {
Source metadata `json:"_source"`
}
type searchResult struct {
Hits struct {
Total int
Hits []hit
}
}
func getmetadata(name string, versionId int) (meta metadata, e error) {
// 索引为 metadata ,类型为 objects,文档 id 为对象名称和版本号的拼接
url := fmt.Sprintf(sys.GetmetadataUrl, os.Getenv(sys.EsServer), name, versionId)
// 通过 GET URL 可以直接获取该对象的元数据,免除了耗时的搜索操作
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf(sys.FailToGetmetadata, name, versionId, r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
// 将请求结果反序列化为元数据结构
json.Unmarshal(result, &meta)
return
}
func SearchLatestVersion(name string) (meta metadata, e error) {
// 构建 url 时需要将名称转移成 url 字符
url := fmt.Sprintf(sys.SearchLatestVersionUrl, os.Getenv(sys.EsServer), url.PathEscape(name))
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf(sys.FailToSearchLatestmetadata, r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
// 请求结果反序列化
json.Unmarshal(result, &sr)
// 如果长度为 0 则没有搜索结果,直接返回
if len(sr.Hits.Hits) != 0 {
meta = sr.Hits.Hits[0].Source
}
return
}
func Getmetadata(name string, version int) (metadata, error) {
// 没有指定版本号时默认返回最新版本的元数据
if version == 0 {
return SearchLatestVersion(name)
}
return getmetadata(name, version)
}
func Putmetadata(name string, version int, size int64, hash string) error {
doc := fmt.Sprintf(sys.metadataJson, name, version, size, hash)
client := http.Client{}
url := fmt.Sprintf(sys.PutmetadataUrl, os.Getenv(sys.EsServer), name, version)
request, _ := http.NewRequest(http.MethodPut, url, strings.NewReader(doc))
r, e := client.Do(request)
if e != nil {
return e
}
if r.StatusCode == http.StatusConflict {
return Putmetadata(name, version+1, size, hash)
}
if r.StatusCode != http.StatusCreated {
result, _ := ioutil.ReadAll(r.Body)
return fmt.Errorf(sys.FailToPutmetadata, r.StatusCode, string(result))
}
return nil
}
func AddVersion(name, hash string, size int64) error {
// 获取目前最新的版本
version, e := SearchLatestVersion(name)
if e != nil {
return e
}
// 创建一个最新的版本号
return Putmetadata(name, version.Version+1, size, hash)
}
func SearchAllVersions(name string, from, size int) ([]metadata, error) {
// 不指定名字时则搜索全部对象的全部版本,指定名字时则搜索某个对象的全部版本
url := fmt.Sprintf(sys.SearchAllVersionsUrl, os.Getenv(sys.EsServer), from, size)
if name != "" {
url += "&q=name:" + name
}
r, e := http.Get(url)
if e != nil {
return nil, e
}
metas := make([]metadata, 0)
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
for i := range sr.Hits.Hits {
metas = append(metas, sr.Hits.Hits[i].Source)
}
return metas, nil
}
func Delmetadata(name string, version int) {
client := http.Client{}
url := fmt.Sprintf(sys.DelmetadataUrl, os.Getenv(sys.EsServer), name, version)
request, _ := http.NewRequest(http.MethodDelete, url, nil)
client.Do(request)
}
type Bucket struct {
Key string
Doc_count int
Min_version struct {
Value float32
}
}
type aggregateResult struct {
Aggregations struct {
Group_by_name struct {
Buckets []Bucket
}
}
}
func SearchVersionStatus(min_doc_count int) ([]Bucket, error) {
client := http.Client{}
url := fmt.Sprintf(sys.SearchVersionStatusUrl, os.Getenv(sys.EsServer))
body := fmt.Sprintf(sys.SearchVersionStatusJson, min_doc_count)
request, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(body))
r, e := client.Do(request)
if e != nil {
return nil, e
}
b, _ := ioutil.ReadAll(r.Body)
var ar aggregateResult
json.Unmarshal(b, &ar)
return ar.Aggregations.Group_by_name.Buckets, nil
}
func HasHash(hash string) (bool, error) {
url := fmt.Sprintf(sys.HasHashUrl, os.Getenv(sys.EsServer), hash)
r, e := http.Get(url)
if e != nil {
return false, e
}
b, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(b, &sr)
return sr.Hits.Total != 0, nil
}
func SearchHashSize(hash string) (size int64, e error) {
url := fmt.Sprintf(sys.SearchHashSizeUrl, os.Getenv(sys.EsServer), hash)
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf(sys.FailToSearchHashSize, r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
if len(sr.Hits.Hits) != 0 {
size = sr.Hits.Hits[0].Source.Size
}
return
}



