参考 有趣的纠删码(erasure code)
文章部分摘取:ErasureCode(纠删码)以更低成本的方式提供近似三副本的可靠性,吸引众多分布式存储/云存储的厂商和用户。可以说纠删码是云存储,尤其是现在广泛使用的对象存储的核心。纠删码(Erasure Coding,EC)是一种编码容错技术,最早是在通信行业解决部分数据在传输中的损耗问题。其基本原理就是把传输的信号分段,加入一定的校验再让各段间发生相互关联,即使在传输过程中丢失部分信号,接收端仍然能通过算法将完整的信息计算出来。在数据存储中,纠删码将数据分割成片段,把冗余数据块扩展和编码,并将其存储在不同的位置,例如磁盘、存储节点或者其他地理位置。
实现流程使用 4 + 2 RS 码冗余策略为例。
PUT 流程 GET 流程 工程目录├─apiServer // 接口服务 │ ├─cmd // 程序入口 │ ├─heartbeat // mq 心跳机制 │ ├─locate // 定位 │ ├─objects // 服务层对象处理 │ └─version // 版本处理 ├─dataServer // 数据服务 │ ├─cmd // 程序入口 │ ├─heartbeat // mq 心跳机制 │ ├─locate // 定位 │ ├─objects // 数据层对象处理 │ └─tmp // 临时对象处理 ├─es // es 搜索引擎 ├─objectStream // 对象流 ├─rabbitmq // mq 中间件 ├─redis // redis 中间件 ├─rs // 纠删码库 ├─schedule // 定时任务 ├─sys // 系统变量 ├─test // 测试 ├─types // 类型定义 └─utils // 工具类
核心是使用 "github.com/klauspost/reedsolomon" 库封装 rs 编码器和译码器,通过接口服务层调用编码器生成数据分片保存在各个数据服务节点,再通过译码器将各个分片整合还原,如果发生数据丢失还会重新创建丢失的分片。部分源码说明如下:
// 创建 RS 对象 func New(dataShards int, parityShards int, opts ...Option) (Encoder, error) New creates a new encoder and initializes it to the number of data shards and parity shards that you want to use. You can reuse this encoder. Note that the maximum number of total shards is 256. If no options are supplied, default options are used. // 重构函数 func (Encoder) Reconstruct(shards [][]byte) error Reconstruct will recreate the missing shards if possible. Given a list of shards, some of which contain data, fills in the ones that don't have data. The length of the array must be equal to the total number of shards. You indicate that a shard is missing by setting it to nil or zero-length. If a shard is zero-length but has sufficient capacity, that memory will be used, otherwise a new []byte will be allocated. If there are too few shards to reconstruct the missing ones, ErrTooFewShards will be returned. The reconstructed shard set is complete, but integrity is not verified. Use the Verify function to check if data set is ok.RS 库封装
package rs
import (
"demo/objectStream"
"github.com/klauspost/reedsolomon"
"io"
)
const (
DATA_SHARDS = 4
PARITY_SHARDS = 2
ALL_SHARDS = DATA_SHARDS + PARITY_SHARDS
BLOCK_PER_SHARD = 8000
BLOCK_SIZE = BLOCK_PER_SHARD * DATA_SHARDS
)
type decoder struct {
readers []io.Reader
writers []io.Writer
enc reedsolomon.Encoder
size int64 // 对象大小
cache []byte // 缓存数据
cacheSize int // 缓存大小
total int64 // 当前已读大小
}
func NewDecoder(readers []io.Reader, writers []io.Writer, size int64) *decoder {
enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
return &decoder{readers, writers, enc, size, nil, 0, 0}
}
func (d *decoder) Read(p []byte) (n int, err error) {
if d.cacheSize == 0 {
e := d.getData()
if e != nil {
return 0, e
}
}
length := len(p)
if d.cacheSize < length {
length = d.cacheSize
}
d.cacheSize -= length
copy(p, d.cache[:length])
d.cache = d.cache[length:]
return length, nil
}
func (d *decoder) getData() error {
// 如果当前已解码的数据等于原始数据大小,则所有数据已经被读取,返回文件尾标识 io.EOF
if d.total == d.size {
return io.EOF
}
// 创建 []byte 类型的切片,长度为 6 ,用于保存相应切片的数据
shards := make([][]byte, ALL_SHARDS)
// 创建一个整型切片,用于保存修复切片的下标
repairIds := make([]int, 0)
// 遍历 readers
for i := range shards {
// 如果 readers[i] 为空则说明分片数据丢失需要修复
if d.readers[i] == nil {
repairIds = append(repairIds, i)
} else {
// 如果 readers[i] 不为空说明分片数据正常,则将分片数据保存到 shards[i] 中
shards[i] = make([]byte, BLOCK_PER_SHARD)
n, e := io.ReadFull(d.readers[i], shards[i])
if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
shards[i] = nil
} else if n != BLOCK_PER_SHARD {
shards[i] = shards[i][:n]
}
}
}
// 尝试重构已丢失的数据切片
e := d.enc.Reconstruct(shards)
if e != nil {
return e
}
for i := range repairIds {
id := repairIds[i]
d.writers[id].Write(shards[id])
}
for i := 0; i < DATA_SHARDS; i++ {
shardSize := int64(len(shards[i]))
if d.total+shardSize > d.size {
shardSize -= d.total + shardSize - d.size
}
d.cache = append(d.cache, shards[i][:shardSize]...)
d.cacheSize += int(shardSize)
d.total += shardSize
}
return nil
}
type encoder struct {
writers []io.Writer
enc reedsolomon.Encoder
cache []byte
}
func NewEncoder(writers []io.Writer) *encoder {
// reedsolomon 是开源的 RS 码编解码库
// 这里生成了 DATA_SHARDS 个数据分片和 PARITY_SHARDS 个校验分片的 RS 码编码器 enc
enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
return &encoder{writers, enc, nil}
}
func (e *encoder) Write(p []byte) (n int, err error) {
// 获取待写入的数据 p 的总长度
length := len(p)
// 当前缓存的数据长度
current := 0
// 将待写入的数据 p 以块的形式保存在缓存
for length != 0 {
// 块数据长度 - 已缓存数据长度 = 可缓存数据长度
next := BLOCK_SIZE - len(e.cache)
// 如果可缓存数据长度比数据长度 p 大,则可以继续缓存数据
if next > length {
next = length
}
// 新增缓存数据
e.cache = append(e.cache, p[current:current+next]...)
if len(e.cache) == BLOCK_SIZE {
e.Flush()
}
// 当前已缓存数据长度增加
current += next
// 需要保存数据 p 的长度减少
length -= next
}
return len(p), nil
}
func (e *encoder) Flush() {
// 如果缓存数据长度为 0 则直接返回
if len(e.cache) == 0 {
return
}
// 调用 Split 方法将缓存数据切分数据片,并且生成两个空的奇偶校验片
shards, _ := e.enc.Split(e.cache)
// 调用 Encode 方法生成数据分片的奇偶校验片
e.enc.Encode(shards)
for i := range shards {
// 每个数据分片文件真正写入分片数据
e.writers[i].Write(shards[i])
}
// 重置缓存大小
e.cache = []byte{}
}
type RSGetStream struct {
*decoder
}
func NewRSGetStream(locateInfo map[int]string, dataServers []string, hash string, size int64) (*RSGetStream, error) {
// 先判断数据服务节点数量是否足够
if len(locateInfo)+len(dataServers) != ALL_SHARDS {
return nil, fmt.Errorf(sys.NotEnoughServer)
}
// 创建长度为 6 的 io.Reader 切片用于读取 6 个分片的数据
readers := make([]io.Reader, ALL_SHARDS)
for i := 0; i < ALL_SHARDS; i++ {
server := locateInfo[i]
// 若节点不存在则取随机节点补充
if server == "" {
locateInfo[i] = dataServers[0]
dataServers = dataServers[1:]
continue
}
// 若节点存在则读取该节点保存的切片数据
reader, e := objectStream.NewGetStream(server, fmt.Sprintf("%s.%d", hash, i))
if e == nil {
readers[i] = reader
}
}
// 恢复分片,当节点不存在时取随机节点补充,或者是节点存在时读取切片数据抛出异常都会导致 readers 某个值为 nil
writers := make([]io.Writer, ALL_SHARDS)
perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS
var e error
for i := range readers {
// 当切片元素为 nil 时,创建临时对象写入流用于恢复分片
if readers[i] == nil {
writers[i], e = objectStream.NewTempPutStream(locateInfo[i], fmt.Sprintf("%s.%d", hash, i), perShard)
if e != nil {
return nil, e
}
}
}
// 返回解码器结构体
dec := NewDecoder(readers, writers, size)
return &RSGetStream{dec}, nil
}
func (s *RSGetStream) Close() {
for i := range s.writers {
if s.writers[i] != nil {
s.writers[i].(*objectStream.TempPutStream).Commit(true)
}
}
}
func (s *RSGetStream) Seek(offset int64, whence int) (int64, error) {
if whence != io.SeekCurrent {
panic("only support SeekCurrent")
}
if offset < 0 {
panic("only support forward seek")
}
for offset != 0 {
length := int64(BLOCK_SIZE)
if offset < length {
length = offset
}
buf := make([]byte, length)
io.ReadFull(s, buf)
offset -= length
}
return offset, nil
}
type RSPutStream struct {
*encoder
}
func NewRSPutStream(dataServers []string, hash string, size int64) (*RSPutStream, error) {
// 如果参数中的数据服务节点数量小于最低标准,则无法存储数据
if len(dataServers) != ALL_SHARDS {
return nil, fmt.Errorf(sys.NotEnoughServer)
}
// 数据切片
perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS
// writer 切片,用于每个数据切片的写入
writers := make([]io.Writer, ALL_SHARDS)
var e error
for i := range writers {
// 每个数据切片需要创建临时文件去保存数据
writers[i], e = objectStream.NewTempPutStream(dataServers[i],
fmt.Sprintf("%s.%d", hash, i), perShard)
if e != nil {
return nil, e
}
}
// 将初始化后的 writer 切片构建 RS 码编码器
// reedsolomon 是开源的 RS 码编解码库
// 这里通过封装的 NewEncoder 方法生成了 DATA_SHARDS 个数据分片和 PARITY_SHARDS 个校验分片的 RS 码编码器 enc 返回
enc := NewEncoder(writers)
// 返回 RS 编码结构体
// RSPutStream 没有自身的 Write 方法,则会调用内嵌的 enc 的 Write 方法
return &RSPutStream{enc}, nil
}
func (s *RSPutStream) Commit(success bool) {
// 将缓存中最后的数据写入
s.Flush()
// 将临时文件转正或者删除
for i := range s.writers {
s.writers[i].(*objectStream.TempPutStream).Commit(success)
}
}
objectStream 封装
对象流封装,主要在文件流底层使用:
package objectStream
import (
"demo/sys"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
)
type PutStream struct {
writer *io.PipeWriter // writer 用于实现 Write 方法
c chan error // c 用于将 goroutine 传输数据过程中抛出的错误传回主协程
}
func NewPutStream(server, obj string) *PutStream {
// 创建一对管道互连的 reader 和 writer,写入 writer 中的数据可以从 reader 中读出来
reader, writer := io.Pipe()
c := make(chan error)
// 由于管道读写是阻塞的,因此需要另起一个 goroutine 来向数据服务节点发起 PUT 请求,此时接口服务是一个 client 端
go func() {
// 构建请求
request, _ := http.NewRequest(http.MethodPut, "http://"+server+"/handleObjs/"+obj, reader)
client := http.Client{}
// 发起请求
r, e := client.Do(request)
// 其他异常
if e == nil && r.StatusCode != http.StatusOK {
e = fmt.Errorf(sys.DataServerError, r.StatusCode)
}
// 将异常写入管道
c <- e
}()
// 返回接口服务存储对象流
return &PutStream{
writer: writer,
c: c,
}
}
func (w *PutStream) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}
func (w *PutStream) Close() error {
w.writer.Close()
return <-w.c
}
type GetStream struct {
reader io.Reader
}
func newGetStream(url string) (*GetStream, error) {
// 向数据服务节点发起 GET 请求
r, e := http.Get(url)
if e != nil {
return nil, e
}
if r.StatusCode != http.StatusOK {
return nil, fmt.Errorf(sys.DataServerError, r.StatusCode)
}
// 返回请求响应数据
return &GetStream{
reader: r.Body,
}, nil
}
func NewGetStream(server, obj string) (*GetStream, error) {
// 当数据服务节点为空或者请求对象名字为空,则抛出异常
if server == "" || obj == "" {
return nil, fmt.Errorf(sys.InvalidServerOrObject, server, obj)
}
// 调用数据服务节点 GET 接口,返回请求响应数据
return newGetStream("http://" + server + "/handleObjs/" + obj)
}
func (r *GetStream) Read(p []byte) (n int, err error) {
return r.reader.Read(p)
}
type TempPutStream struct {
Server string
Uuid string
}
func NewTempPutStream(server, object string, size int64) (*TempPutStream, error) {
// 请求数据服务节点构建一个临时文件
request, e := http.NewRequest(http.MethodPost, "http://"+server+"/temp/"+object, nil)
if e != nil {
return nil, e
}
request.Header.Set("size", fmt.Sprintf("%d", size))
client := http.Client{}
response, e := client.Do(request)
if e != nil {
return nil, e
}
// 处理返回结果
uuid, e := ioutil.ReadAll(response.Body)
if e != nil {
return nil, e
}
// 返回缓存对象指针
return &TempPutStream{server, string(uuid)}, nil
}
func (w *TempPutStream) Write(p []byte) (n int, err error) {
request, e := http.NewRequest(http.MethodPatch, "http://"+w.Server+"/temp/"+w.Uuid, strings.NewReader(string(p)))
if e != nil {
return 0, e
}
client := http.Client{}
r, e := client.Do(request)
if e != nil {
return 0, e
}
if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf(sys.DataServerError, r.StatusCode)
}
return len(p), nil
}
func (w *TempPutStream) Commit(ok bool) {
method := http.MethodDelete
if ok {
method = http.MethodPut
}
request, _ := http.NewRequest(method, "http://"+w.Server+"/temp/"+w.Uuid, nil)
client := http.Client{}
client.Do(request)
}
func NewTempGetStream(server, uuid string) (*GetStream, error) {
return newGetStream("http://" + server + "/temp/" + uuid)
}
utils 封装
封装了散列值的计算和偏移计算:
package utils
import (
"crypto/sha256"
"encoding/base64"
uuid "github.com/satori/go.uuid"
"io"
"net/http"
"strconv"
"strings"
)
func GetOffsetFromHeader(h http.Header) int64 {
byteRange := h.Get("range")
if len(byteRange) < 7 {
return 0
}
if byteRange[:6] != "bytes=" {
return 0
}
bytePos := strings.Split(byteRange[6:], "-")
offset, _ := strconv.ParseInt(bytePos[0], 0, 64)
return offset
}
func GetHashFromHeader(h http.Header) string {
digest := h.Get("digest")
if len(digest) < 9 {
return ""
}
if digest[:8] != "SHA-256=" {
return ""
}
return digest[8:]
}
func GetSizeFromHeader(h http.Header) int64 {
size, _ := strconv.ParseInt(h.Get("content-length"), 0, 64)
return size
}
func CalculateHash(r io.Reader) string {
h := sha256.New()
// h 会对写入的数据计算散列值
io.Copy(h, r)
// 散列值计算出来为二进制数据,需要 base64 编码处理
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
func GenerateUUID() string {
return uuid.NewV4().String()
}
types 封装
主要封装了各个数据服务节点响应定位对象的信息类型:
package types
type LocateMessage struct {
Addr string
Id int
}
dataServer 实现
cmd 程序入口
package main
import (
"demo/dataServer/heartbeat"
"demo/dataServer/locate"
"demo/dataServer/objects"
"demo/dataServer/tmp"
"demo/sys"
"net/http"
"os"
)
func main() {
// 扫描全文件缓存到 redis 服务器
locate.CollectObjects()
// 向 apiServers exchange 发送心跳
go heartbeat.StartHeartbeat()
// 监听定位信息
go locate.StartLocate()
// 注册URL与逻辑处理函数
// 仅处理 GET 请求,对象上传依靠 temp 接口的临时文件转正,无需 PUT 方法
http.HandleFunc("/handleObjs/", objects.Handler)
http.HandleFunc("/temp/", tmp.Handler)
// 启动并监听服务
http.ListenAndServe(os.Getenv(sys.ListenAddress), nil)
}
heartbeat 心跳机制
package heartbeat
import (
"demo/rabbitmq"
"demo/sys"
"os"
"time"
)
func StartHeartbeat() {
q := rabbitmq.New(os.Getenv(sys.RabbitmqServer))
defer q.Close()
for {
q.Publish(sys.ApiServersExchange, os.Getenv(sys.ListenAddress))
// 休眠 3s
time.Sleep(3 * time.Second)
}
}
locate 对象定位
package locate
import (
"demo/rabbitmq"
myredis "demo/redis"
"demo/sys"
"demo/types"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
)
// 多读写锁
var mutex sync.RWMutex
// 定位对象
func Locate(hash string) int {
mutex.Lock()
ok := myredis.Hexists(os.Getenv(sys.RedisHashTableName), hash)
mutex.Unlock()
if ok {
id, _ := strconv.Atoi(myredis.Hget(os.Getenv(sys.RedisHashTableName), hash))
log.Printf("node id is %d", id)
return id
}
return -1
}
func Add(hash string, id int) {
mutex.Lock()
myredis.Hset(os.Getenv(sys.RedisHashTableName), hash, string(rune(id)))
mutex.Unlock()
}
func Del(hash string) {
mutex.Lock()
myredis.Hdel(os.Getenv(sys.RedisHashTableName), hash)
mutex.Unlock()
}
// 监听定位信息
func StartLocate() {
q := rabbitmq.New(os.Getenv(sys.RabbitmqServer))
defer q.Close()
// 绑定 data 网络层
q.Bind(sys.DataServersExchange)
// 获取信息管道
c := q.Consume()
// 从管道中遍历信息,msg 为需要定位的存储对象名字
for msg := range c {
// 去掉 json 序列化的双引号
obj, e := strconv.Unquote(string(msg.Body))
if e != nil {
log.Fatalln(e)
}
// 定位存储对象,名字需要 URL 转义 url.PathEscape(obj) ,如果由前端转义则无需处理
id := Locate(obj)
if id != -1 {
// 如果存储对象存在,则回送本节点监听地址,已告知存储对象在该节点
q.Send(msg.ReplyTo, types.LocateMessage{os.Getenv(sys.ListenAddress), id})
}
}
}
// 扫描全磁盘,将所有的文件缓存到 redis table
func CollectObjects() {
objects := make(map[string]interface{})
// pattern /home/sam/files/1/objects
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
// GET 方法时,获取资源
if m == http.MethodGet {
get(w, r)
return
}
// 其他方式时,返回状态码,方法不允许
w.WriteHeader(http.StatusMethodNotAllowed)
}
func get(w http.ResponseWriter, r *http.Request) {
// 文件是否存在
file := getFile(strings.Split(r.URL.EscapedPath(), "/")[2])
if file == "" {
// 不存在则返回 404
w.WriteHeader(http.StatusNotFound)
return
}
// 存在则返回数据
sendFile(w, file)
}
func getFile(name string) string {
// 打开磁盘中的文件,检查该文件哈希值是否与请求的哈希值一致
files, _ := filepath.Glob(os.Getenv(sys.StorageRoot) + "/objects/" + name + ".*")
if len(files) != 1 {
return ""
}
file := files[0]
h := sha256.New()
sendFile(h, file)
// 计算哈希值并进行 url 转义
d := url.PathEscape(base64.StdEncoding.EncodeToString(h.Sum(nil)))
hash := strings.Split(file, ".")[2]
// 如果磁盘文件计算的哈希值与请求的哈希值不一致,说明磁盘的文件数据被降解,需要删除该错误的数据
if d != hash {
log.Printf(sys.HashMismatchThenRemove, name)
// 从全局缓存中移除该文件
locate.Del(hash)
// 磁盘删除该文件
os.Remove(file)
return ""
}
// 哈希值一致则返回文件路径
return file
}
func sendFile(w io.Writer, file string) {
// 打开请求的文件
f, _ := os.Open(file)
defer f.Close()
// 将文件写入响应流 w
io.Copy(w, f)
}
tmp 临时对象处理
package tmp
import (
"demo/dataServer/locate"
"demo/sys"
"demo/utils"
"encoding/json"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
)
// 临时文件结构体
type tempInfo struct {
Uuid string
Name string
Size int64
}
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
// PUT 将临时文件转换成正式文件
if m == http.MethodPut {
put(w, r)
return
}
// PATCH 写入数据到临时文件
if m == http.MethodPatch {
patch(w, r)
return
}
// POST 创建临时文件
if m == http.MethodPost {
post(w, r)
return
}
// DELETe 删除临时文件
if m == http.MethodDelete {
del(w, r)
return
}
// 其他方法响应 405
w.WriteHeader(http.StatusMethodNotAllowed)
}
func put(w http.ResponseWriter, r *http.Request) {
// 获取临时文件的 uuid
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
// 读取临时文件信息反序列化为结构体
tempinfo, e := readFromFile(uuid)
if e != nil {
log.Println(e)
// 信息文件未找到 404
w.WriteHeader(http.StatusNotFound)
return
}
// 临时文件路径拼接
infoFile := os.Getenv(sys.StorageRoot) + "/temp/" + uuid
datFile := infoFile + ".dat"
// 打开临时数据文件
f, e := os.Open(datFile)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 最终关闭文件流
defer f.Close()
// 调用 f.Stat() 获取当前临时文件信息
info, e := f.Stat()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 查看临时文件当前大小
actual := info.Size()
// 删除临时文件信息
os.Remove(infoFile)
// 如果临时文件当前大小不等于指定的大小,则认为出错,删除临时数据文件,返回 500
if actual != tempinfo.Size {
// 删除临时数据文件
os.Remove(datFile)
log.Printf(sys.SizeMismatch, tempinfo.Size, actual)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 临时文件转正式文件
commitTempObject(datFile, tempinfo)
}
func post(w http.ResponseWriter, r *http.Request) {
// 使用命令生成 uuid 具有跨平台的局限性,这里使用一个 uuid 包生成
uuid := utils.GenerateUUID()
// 获取文件散列值
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 获取文件大小
size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 构建一个临时文件结构体
t := tempInfo{uuid, name, size}
// 创建一个临时文件 /home/sam/files/tmp/uuid 保存该对象的信息
e = t.writeToFile()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 创建一个临时文件 /home/sam/files/tmp/uuid.dat 保存该对象的内容数据
os.Create(os.Getenv(sys.StorageRoot) + "/temp/" + t.Uuid + ".dat")
// 响应返回临时文件 uuid
w.Write([]byte(uuid))
}
func (t *tempInfo) writeToFile() error {
f, e := os.Create(os.Getenv(sys.StorageRoot) + "/temp/" + t.Uuid)
if e != nil {
return e
}
defer f.Close()
b, _ := json.Marshal(t)
f.Write(b)
return nil
}
func commitTempObject(datFile string, tempinfo *tempInfo) {
f, _ := os.Open(datFile)
d := url.PathEscape(utils.CalculateHash(f))
f.Close()
os.Rename(datFile, os.Getenv(sys.StorageRoot)+"/objects/"+tempinfo.Name+"."+d)
locate.Add(tempinfo.hash(), tempinfo.id())
}
func del(w http.ResponseWriter, r *http.Request) {
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
infoFile := os.Getenv(sys.StorageRoot) + "/temp/" + uuid
datFile := infoFile + ".dat"
os.Remove(infoFile)
os.Remove(datFile)
}
func patch(w http.ResponseWriter, r *http.Request) {
// 获取对象的散列值
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
// 获取对象信息
tempinfo, e := readFromFile(uuid)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 拼接临时文件路径
infoFile := os.Getenv(sys.StorageRoot) + "/temp/" + uuid
datFile := infoFile + ".dat"
// 打开临时文件,以 os.O_WRONLY|os.O_APPEND 模式打开,只写入,追加数据,不设置权限掩码
f, e := os.OpenFile(datFile, os.O_WRONLY|os.O_APPEND, 0)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 最后关闭文件流
defer f.Close()
// 将请求的数据 r.Body 拷贝到临时文件 f 中
_, e = io.Copy(f, r.Body)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
info, e := f.Stat()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 获取临时文件当前大小
actual := info.Size()
// 如果当前大小已经超过了设定的大小,则认为出现错误,删除临时文件信息和临时文件内容,返回 500
if actual > tempinfo.Size {
os.Remove(datFile)
os.Remove(infoFile)
log.Println(sys.SizeMismatch, tempinfo.Size, actual)
w.WriteHeader(http.StatusInternalServerError)
}
}
func readFromFile(uuid string) (*tempInfo, error) {
f, e := os.Open(os.Getenv(sys.StorageRoot) + "/temp/" + uuid)
if e != nil {
return nil, e
}
defer f.Close()
b, _ := ioutil.ReadAll(f)
var info tempInfo
json.Unmarshal(b, &info)
return &info, nil
}
func (t *tempInfo) hash() string {
s := strings.Split(t.Name, ".")
return s[0]
}
func (t *tempInfo) id() int {
s := strings.Split(t.Name, ".")
id, _ := strconv.Atoi(s[1])
return id
}
apiServer 实现
cmd
package main
import (
"demo/apiServer/heartbeat"
"demo/apiServer/locate"
"demo/apiServer/objects"
"demo/apiServer/version"
"demo/sys"
"net/http"
"os"
)
func main() {
// 监听数据服务节点心跳
go heartbeat.ListenHeartbeat()
// 处理对象请求,实际上是将对象请求转发给数据服务
http.HandleFunc("/handleObjs/", objects.Handler)
// 处理定位请求
http.HandleFunc("/locateObj/", locate.Handler)
// 处理版本信息
http.HandleFunc("/versions/", version.Handler)
// 启动并监听服务
http.ListenAndServe(os.Getenv(sys.ListenAddress), nil)
}
heartbeat
package heartbeat
import (
"demo/rabbitmq"
"demo/sys"
"log"
"math/rand"
"os"
"strconv"
"sync"
"time"
)
var dataServers = make(map[string]time.Time) // 缓存数据服务节点
var m sync.RWMutex // 使用多读写锁,比互斥锁高效
func ListenHeartbeat() {
q := rabbitmq.New(os.Getenv(sys.RabbitmqServer))
defer q.Close()
// 绑定 api 网络层
q.Bind(sys.ApiServersExchange)
c := q.Consume()
// 移除过期节点
go removeExpiredDataServer()
// 监听数据服务节点心跳,将心跳信息写入全局缓存
for msg := range c {
dataServer, e := strconv.Unquote(string(msg.Body))
if e != nil {
log.Fatalln(e)
}
// 写操作互斥,防止多 goroutine 对 dataServers 同时写
m.Lock()
dataServers[dataServer] = time.Now()
m.Unlock()
}
}
func removeExpiredDataServer() {
// 每 3s 扫描一遍缓存的数据服务节点
// 若当前时间减去心跳时间超过 6s 则判定为节点过期
for {
time.Sleep(3 * time.Second)
// 写操作互斥,防止多 goroutine 对 dataServers 同时写
m.Lock()
for s, t := range dataServers {
if t.Add(6 * time.Second).Before(time.Now()) {
delete(dataServers, s)
}
}
m.Unlock()
}
}
func GetDataServers() []string {
// 读锁,可多 goroutine 对 dataServers 同时读
m.RLock()
defer m.RUnlock()
dataServer := make([]string, 0)
for s, _ := range dataServers {
dataServer = append(dataServer, s)
}
return dataServer
}
func ChooseRandomDataServer() string {
dataServer := GetDataServers()
length := len(dataServer)
if length == 0 {
return ""
}
return dataServer[rand.Intn(length)]
}
func ChooseRandomDataServers(n int, exclude map[int]string) []string {
// 存储可用的节点
candidates := make([]string, 0)
// 需要排除的节点,将 key => value 反转,便于遍历
reverseExcludeMap := make(map[string]int)
for id, addr := range exclude {
reverseExcludeMap[addr] = id
}
// 获取全部节点
servers := GetDataServers()
for i := range servers {
s := servers[i]
// 若节点不需要排除则加入到可用节点数组
_, excluded := reverseExcludeMap[s]
if !excluded {
candidates = append(candidates, s)
}
}
// 若可用节点数小于指定的节点数,返回空数组
ds := make([]string, 0)
length := len(candidates)
if length < n {
return ds
}
// 若可用节点数大于等于指定的节点数,则打乱顺序取 n 个返回
p := rand.Perm(length)
for i := 0; i < n; i++ {
ds = append(ds, candidates[p[i]])
}
// 返回结果
return ds
}
locate
package locate
import (
"demo/rabbitmq"
"demo/rs"
"demo/sys"
"demo/types"
"encoding/json"
"log"
"net/http"
"os"
"strings"
"time"
)
func Locate(name string) map[int]string {
// 创建临时消息队列
q := rabbitmq.New(os.Getenv(sys.RabbitmqServer))
// 向 data 网络层群发这个存储对象的名字
q.Publish(sys.DataServersExchange, name)
// 获取信息管道
c := q.Consume()
// 休眠一秒之后将临时消息队列关闭,防止超时阻塞
go func() {
time.Sleep(time.Second)
q.Close()
}()
// 从管道中读取定位信息
// 数据服务节点地址 Addr string
// 数据服务节点分片编号 Id int
locateInfo := make(map[int]string)
for i := 0; i < rs.ALL_SHARDS; i++ {
msg := <-c
log.Println("nodes locate info : " + string(msg.Body))
if len(msg.Body) == 0 {
return locateInfo
}
var info types.LocateMessage
// 反序列化 json 格式
json.Unmarshal(msg.Body, &info)
// 记录一个数据服务节点
locateInfo[info.Id] = info.Addr
}
// 返回数据服务节点响应结果
return locateInfo
}
func Exist(name string) bool {
return len(Locate(name)) >= rs.DATA_SHARDS
}
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
// 非 GET 方法
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
objInfo := Locate(strings.Split(r.URL.EscapedPath(), "/")[2])
// 未找到该存储对象
if len(objInfo) == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
obj, _ := json.Marshal(objInfo)
w.Write(obj)
}
objects
package objects
import (
"demo/apiServer/heartbeat"
"demo/apiServer/locate"
"demo/es"
"demo/rs"
"demo/sys"
"demo/utils"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strconv"
"strings"
)
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
// PUT 方法时,创建或者替换资源
if m == http.MethodPut {
put(w, r)
return
}
// GET 方法时,获取资源
if m == http.MethodGet {
get(w, r)
return
}
// 版本删除
if m == http.MethodDelete {
del(w, r)
return
}
// 其他方式时,返回状态码,方法不允许
w.WriteHeader(http.StatusMethodNotAllowed)
}
func put(w http.ResponseWriter, r *http.Request) {
// 按以前的步骤,这里应该获取存储对象名字,不过从 header 中取对象的散列值作为名字
hash := utils.GetHashFromHeader(r.Header)
if hash == "" {
log.Println(sys.MissingObjectHash)
w.WriteHeader(http.StatusBadRequest)
return
}
// 从 header 中获取 size ,创建临时文件时指定文件大小
size := utils.GetSizeFromHeader(r.Header)
// 存储请求数据,散列值要作转义
httpStatus, e := storeObject(r.Body, url.PathEscape(hash), size)
if e != nil {
log.Println(e)
w.WriteHeader(httpStatus)
return
}
if httpStatus != http.StatusOK {
w.WriteHeader(httpStatus)
return
}
// 获取名字和大小,新增一个对象版本
name := strings.Split(r.URL.EscapedPath(), "/")[2]
e = es.AddVersion(name, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
// 返回结果
w.WriteHeader(httpStatus)
}
func storeObject(r io.Reader, obj string, size int64) (int, error) {
// 如果已存在一份数据,直接返回
if locate.Exist(obj) {
return http.StatusOK, nil
}
// 获取接口服务节点存储对象的流
stream, e := putStream(obj, size)
if e != nil {
return http.StatusServiceUnavailable, e
}
// stream 属于 TempPutStream 结构体的一个实例化,同时实现了 io.Writer 的 write 方法,作为 io.Writer
// r 是请求存储的数据,作为 io.Reader
// TeeReader 方法类似 Linux tee 命令,返回一个 reader ,当 reader 读取数据时,会从 r 中读取内容,并且写入到 stream 中
reader := io.TeeReader(r, stream)
hash := utils.CalculateHash(reader)
if url.PathEscape(hash) != obj {
// 散列值不一致,删除临时文件
stream.Commit(false)
return http.StatusBadRequest, fmt.Errorf(sys.HashMismatch, hash, obj)
}
// 散列值一致,将临时文件转成正式文件
stream.Commit(true)
// 返回成功状态码
return http.StatusOK, nil
}
func putStream(hash string, size int64) (*rs.RSPutStream, error) {
// 获取全部的数据服务节点,无需排除任何节点
servers := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil)
// 如果数据服务节点长度不等于分片长度,则无法完整保存数据,提示报错
if len(servers) != rs.ALL_SHARDS {
return nil, fmt.Errorf(sys.NotEnoughServer)
}
return rs.NewRSPutStream(servers, hash, size)
}
func get(w http.ResponseWriter, r *http.Request) {
// 获取存储对象名称和版本号
name := strings.Split(r.URL.EscapedPath(), "/")[2]
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
// 版本号字符串转数字
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
// 根据名字和版本号来获取元数据
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 元数据散列值为空则无该对象
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
// 散列值要作 URL 转移
object := url.PathEscape(meta.Hash)
// 根据散列值获取对象数据
stream, e := getStream(object, meta.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 将数据流拷贝到响应流 w
_, e = io.Copy(w, stream)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 流关闭时将临时对象转正
stream.Close()
}
func getStream(hash string, size int64) (*rs.RSGetStream, error) {
// 根据存储对象名称进行定位
locateInfo := locate.Locate(hash)
// 如果定位的数据服务节点数少于数据恢复的最低数量,则无法恢复完整数据,返回定位失败错误
if len(locateInfo) < rs.DATA_SHARDS {
return nil, fmt.Errorf(sys.LocateFail, hash, locateInfo)
}
// 选择数据服务节点,用于数据恢复
dataServers := make([]string, 0)
if len(locateInfo) != rs.ALL_SHARDS {
dataServers = heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS-len(locateInfo), locateInfo)
}
return rs.NewRSGetStream(locateInfo, dataServers, hash, size)
}
func del(w http.ResponseWriter, r *http.Request) {
// 获取名字
name := strings.Split(r.URL.EscapedPath(), "/")[2]
v, e := es.SearchLatestVersion(name)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 插入一条新的元数据作删除标记
e = es.PutMetadata(name, v.Version+1, 0, "")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
version
package version
import (
"demo/es"
"encoding/json"
"log"
"net/http"
"strings"
)
func Handler(w http.ResponseWriter, r *http.Request) {
// 非 GET 方法时响应方法不允许
m := r.Method
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 其实是分页参数,一页最多有 1000 条记录,默认从第 0 条开始往后取数据
// 当返回值的长度不等于 1000 时,则说明后续没有数据了,直接返回
// 当返回值等于 1000 时,说明后续可能有数据, from 则从 1000 条开始往后取数据
from := 0
size := 1000
// 若未指定名字,则切割 URL 之后名字为空字符串
name := strings.Split(r.URL.EscapedPath(), "/")[2]
for {
metas, e := es.SearchAllVersions(name, from, size)
if e != nil {
log.Println(e)
// 服务器内部错误
w.WriteHeader(http.StatusInternalServerError)
return
}
// 遍历结果集
for i := range metas {
// 格式化为 json 返回
b, _ := json.Marshal(metas[i])
w.Write(b)
w.Write([]byte("n"))
}
if len(metas) != size {
return
}
from += size
}
}
测试
模拟环境搭建
# 查看本机网络接口 ip a # 数据服务节点 eth0:1~6 # IP范围 192.168.1.1 ~ 192.168.1.1.6 # 接口服务节点 eth0:7~8 # IP范围 192.168.2.1 ~ 192.168.2.2 # 网络接口绑定多个 IP ifconfig eth0:1 192.168.1.1/24 ifconfig eth0:2 192.168.1.2/24 ifconfig eth0:3 192.168.1.3/24 ifconfig eth0:4 192.168.1.4/24 ifconfig eth0:5 192.168.1.5/24 ifconfig eth0:6 192.168.1.6/24 ifconfig eth0:7 192.168.2.1/24 ifconfig eth0:8 192.168.2.2/24 # rabbitmq-server 变量 export RABBITMQ_SERVER=amqp://yushanma:passwd@192.168.0.55:5672 # es-server 变量 export ES_SERVER=192.168.0.55:9200 # 创建文件夹 for i in `seq 1 6`; do mkdir -p files/$i/objects; done for i in `seq 1 6`; do mkdir -p files/$i/temp; done # go get 超时,使用国内代理访问 go env -w GOPROXY=https://goproxy.cn # 启动数据服务节点 LISTEN_ADDRESS=192.168.1.1:12345 STORAGE_ROOT=/home/sam/files/1 OBJ_CACHE=cache_1 go run dataServer/cmd/main.go & LISTEN_ADDRESS=192.168.1.2:12345 STORAGE_ROOT=/home/sam/files/2 OBJ_CACHE=cache_2 go run dataServer/cmd/main.go & LISTEN_ADDRESS=192.168.1.3:12345 STORAGE_ROOT=/home/sam/files/3 OBJ_CACHE=cache_3 go run dataServer/cmd/main.go & LISTEN_ADDRESS=192.168.1.4:12345 STORAGE_ROOT=/home/sam/files/4 OBJ_CACHE=cache_4 go run dataServer/cmd/main.go & LISTEN_ADDRESS=192.168.1.5:12345 STORAGE_ROOT=/home/sam/files/5 OBJ_CACHE=cache_5 go run dataServer/cmd/main.go & LISTEN_ADDRESS=192.168.1.6:12345 STORAGE_ROOT=/home/sam/files/6 OBJ_CACHE=cache_6 go run dataServer/cmd/main.go & # 启动接口服务节点 LISTEN_ADDRESS=192.168.2.1:12346 go run apiServer/cmd/main.go & LISTEN_ADDRESS=192.168.2.2:12346 go run apiServer/cmd/main.go &
启动成功:
模拟 PUT 操作# 计算散列值 Jack Ma@DESKTOP-L24D7IP MINGW64 ~/Desktop $ echo -n "这个文件会被切分为 4 + 2 个切片" | openssl dgst -sha256 -binary | base64 lg9I9AT0vluHrUeRh2WQ+HKTKUiJvanK2+0kSSn2Aac= # PUT 操作 curl -v 192.168.2.1:12346/handleObjs/test1 -XPUT -d "这个文件会被切分为 4 + 2 个切片" -H "digest:SHA-256=lg9I9AT0vluHrUeRh2WQ+HKTKUiJvanK2+0kSSn2Aac="
可以看到上传成功,接下来查看全部分片是否都已经正确上传:
ls files/?/objects
4 + 2 个数据分片都已正确上传,可以看到数据分片分别保存在 4、1、3、5 节点,校验分片分别保存在 2、6 节点,依顺序查看分片内容:
可以大概辨认出内容,部分乱码是因为切分数据导致的。
模拟 GET 操作curl -v 192.168.2.1:12346/handleObjs/test1
GET 请求成功,将各个数据服务节点的数据切片还原为最初的数据。
模拟定位操作curl -v 192.168.2.1:12346/locateObj/lg9I9AT0vluHrUeRh2WQ+HKTKUiJvanK2+0kSSn2Aac=
定位成功,所有的数据分片都返回了相应的数据服务节点。
模拟数据丢失和恢复删除数据服务节点 1 上的数据分片,重新 GET 请求,查看丢失的数据是否会被修复:
可以看到,丢失了部分数据切片,数据依然可以还原。
至此,我们使用纠删码技术实现了数据的冗余和修复。



