栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

三、区分存储对象的不同版本

三、区分存储对象的不同版本

架构

 (架构图源于参考书籍)

Elasticsearch

官方简介:Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。 作为 Elastic Stack 的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况。

# 下载
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.1-linux-x86_64.tar.gz

# 解压
tar xzvf elasticsearch-7.15.1-linux-x86_64.tar.gz

# 以非 root 用户启动
cd /elasticsearch-7.15.1/bin/
./elasticsearch

# 检验是否启动成功,172.16.16.4 为 elasticsearch.yml 配置绑定的 IP 地址
curl 172.16.16.4:9200

若无法正常启动,则修改配置:

/home/sam/elasticsearch-7.15.1/config

修改 jvm.options 中内存配置:
-Xms256m
-Xmx256m

修改 vim elasticsearch.yml :
cluster.name: my-application
node.name: node-1
network.host: 172.16.16.4
http.port: 9200
discovery.seed_hosts: ["172.26.26.4", "::1"]
cluster.initial_master_nodes: ["node-1"]
创建映射

创建 metadata 索引以及 objects 类型的映射:

curl -H "Content-Type: application/json" -XPUT 172.16.16.4:9200/metadata?include_type_name=true -d'{"mappings":{"objects":{"properties":{"name":{"type":"text","index":"false"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"text"}}}}}'
ES包封装

该 ES 包封装了以 HTTP 访问 ES 的各种 API 的操作。

package es


import (
	"demo/sys"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"net/url"
	"os"
	"strings"
)


type metadata struct {
	Name    string
	Version int
	Size    int64
	Hash    string
}

type hit struct {
	Source metadata `json:"_source"`
}

type searchResult struct {
	Hits struct {
		Total int
		Hits  []hit
	}
}


func getmetadata(name string, versionId int) (meta metadata, e error) {
	// 索引为 metadata ,类型为 objects,文档 id 为对象名称和版本号的拼接
	url := fmt.Sprintf(sys.GetmetadataUrl, os.Getenv(sys.EsServer), name, versionId)
	// 通过 GET URL 可以直接获取该对象的元数据,免除了耗时的搜索操作
	r, e := http.Get(url)
	if e != nil {
		return
	}
	if r.StatusCode != http.StatusOK {
		e = fmt.Errorf(sys.FailToGetmetadata, name, versionId, r.StatusCode)
		return
	}
	result, _ := ioutil.ReadAll(r.Body)
	// 将请求结果反序列化为元数据结构
	json.Unmarshal(result, &meta)
	return
}


func SearchLatestVersion(name string) (meta metadata, e error) {
	// 构建 url 时需要将名称转移成 url 字符
	url := fmt.Sprintf(sys.SearchLatestVersionUrl, os.Getenv(sys.EsServer), url.PathEscape(name))
	r, e := http.Get(url)
	if e != nil {
		return
	}
	if r.StatusCode != http.StatusOK {
		e = fmt.Errorf(sys.FailToSearchLatestmetadata, r.StatusCode)
		return
	}
	result, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	// 请求结果反序列化
	json.Unmarshal(result, &sr)
	// 如果长度为 0 则没有搜索结果,直接返回
	if len(sr.Hits.Hits) != 0 {
		meta = sr.Hits.Hits[0].Source
	}
	return
}


func Getmetadata(name string, version int) (metadata, error) {
	// 没有指定版本号时默认返回最新版本的元数据
	if version == 0 {
		return SearchLatestVersion(name)
	}
	return getmetadata(name, version)
}


func Putmetadata(name string, version int, size int64, hash string) error {
	doc := fmt.Sprintf(sys.metadataJson, name, version, size, hash)
	client := http.Client{}
	url := fmt.Sprintf(sys.PutmetadataUrl, os.Getenv(sys.EsServer), name, version)
	request, _ := http.NewRequest(http.MethodPut, url, strings.NewReader(doc))
	r, e := client.Do(request)
	if e != nil {
		return e
	}
	if r.StatusCode == http.StatusConflict {
		return Putmetadata(name, version+1, size, hash)
	}
	if r.StatusCode != http.StatusCreated {
		result, _ := ioutil.ReadAll(r.Body)
		return fmt.Errorf(sys.FailToPutmetadata, r.StatusCode, string(result))
	}
	return nil
}


func AddVersion(name, hash string, size int64) error {
	// 获取目前最新的版本
	version, e := SearchLatestVersion(name)
	if e != nil {
		return e
	}
	// 创建一个最新的版本号
	return Putmetadata(name, version.Version+1, size, hash)
}


func SearchAllVersions(name string, from, size int) ([]metadata, error) {
	// 不指定名字时则搜索全部对象的全部版本,指定名字时则搜索某个对象的全部版本
	url := fmt.Sprintf(sys.SearchAllVersionsUrl, os.Getenv(sys.EsServer), from, size)
	if name != "" {
		url += "&q=name:" + name
	}
	r, e := http.Get(url)
	if e != nil {
		return nil, e
	}
	metas := make([]metadata, 0)
	result, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	json.Unmarshal(result, &sr)
	for i := range sr.Hits.Hits {
		metas = append(metas, sr.Hits.Hits[i].Source)
	}
	return metas, nil
}


func Delmetadata(name string, version int) {
	client := http.Client{}
	url := fmt.Sprintf(sys.DelmetadataUrl, os.Getenv(sys.EsServer), name, version)
	request, _ := http.NewRequest(http.MethodDelete, url, nil)
	client.Do(request)
}

type Bucket struct {
	Key         string
	Doc_count   int
	Min_version struct {
		Value float32
	}
}

type aggregateResult struct {
	Aggregations struct {
		Group_by_name struct {
			Buckets []Bucket
		}
	}
}


func SearchVersionStatus(min_doc_count int) ([]Bucket, error) {
	client := http.Client{}
	url := fmt.Sprintf(sys.SearchVersionStatusUrl, os.Getenv(sys.EsServer))
	body := fmt.Sprintf(sys.SearchVersionStatusJson, min_doc_count)
	request, _ := http.NewRequest(http.MethodGet, url, strings.NewReader(body))
	r, e := client.Do(request)
	if e != nil {
		return nil, e
	}
	b, _ := ioutil.ReadAll(r.Body)
	var ar aggregateResult
	json.Unmarshal(b, &ar)
	return ar.Aggregations.Group_by_name.Buckets, nil
}

func HasHash(hash string) (bool, error) {
	url := fmt.Sprintf(sys.HasHashUrl, os.Getenv(sys.EsServer), hash)
	r, e := http.Get(url)
	if e != nil {
		return false, e
	}
	b, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	json.Unmarshal(b, &sr)
	return sr.Hits.Total != 0, nil
}

func SearchHashSize(hash string) (size int64, e error) {
	url := fmt.Sprintf(sys.SearchHashSizeUrl, os.Getenv(sys.EsServer), hash)
	r, e := http.Get(url)
	if e != nil {
		return
	}
	if r.StatusCode != http.StatusOK {
		e = fmt.Errorf(sys.FailToSearchHashSize, r.StatusCode)
		return
	}
	result, _ := ioutil.ReadAll(r.Body)
	var sr searchResult
	json.Unmarshal(result, &sr)
	if len(sr.Hits.Hits) != 0 {
		size = sr.Hits.Hits[0].Source.Size
	}
	return
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/345450.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号