2021SC@SDUSC
目录
前言
Seafile-server
源码分析
ci/run.py
/searpc/searpc.go
/lib/rpc_table.py
/python/seafile/rpcclient.py
/python/seaserv/api.py
/common/rpc-service.c
/server/seaf-server.c
结论
前言
libsearpc是rpc框架,而libevhtp是事件通知框架,两者的作用都是使客户端能够调用到所需要的服务,且都在seafile server中被调用,因此首先分析seafile server中相关代码
- rpc框架:远程过程调用协议,使客户端能够通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的思想
- libevhtp:提供http api,从而搭建http服务器
Seafile-server
Seafile的服务器核心。提供RPC给seahub、提供http api给桌面应用。
源码分析
ci/run.py
对seafile-server运行所需要的各部分组件进行部署与安装,包括libsearpc、libevhtp、ccnetserver等
...
@lru_cache()
def get_branch_json_file():
url = ''
return requests.get(url).json()
def get_project_branch(project, default_branch='master'):
travis_branch = os.environ.get('TRAVIS_BRANCH', 'master')
if project.name == 'seafile-server':
return travis_branch
conf = get_branch_json_file()
return conf.get(travis_branch, {}).get(project.name, default_branch)
class Project(object):
def __init__(self, name):
self.name = name
self.version = ''
@property
def url(self):
return ''.format(self.name)
@property
def projectdir(self):
return join(TOPDIR, self.name)
def branch(self):
return get_project_branch(self)
def clone(self):
if exists(self.name):
with cd(self.name):
shell('git fetch origin --tags')
else:
shell(
'git clone --depth=1 --branch {} {}'.
format(self.branch(), self.url)
)
@chdir
def compile_and_install(self):
cmds = [
'./autogen.sh',
'./configure --prefix={}'.format(PREFIX),
'make -j{} V=0'.format(num_jobs()),
'make install',
]
for cmd in cmds:
shell(cmd)
@chdir
def use_branch(self, branch):
shell('git checkout {}'.format(branch))
class Libsearpc(Project):
def __init__(self):
super(Libsearpc, self).__init__('libsearpc')
def branch(self):
return 'master'
...
class Libevhtp(Project):
def __init__(self):
super(Libevhtp, self).__init__('libevhtp')
def branch(self):
return 'master'
@chdir
def compile_and_install(self):
cmds = [
'cmake -DEVHTP_DISABLE_SSL=ON -DEVHTP_BUILD_SHARED=OFF',
'make',
'sudo make install',
'sudo ldconfig',
]
for cmd in cmds:
shell(cmd)
def fetch_and_build():
libsearpc = Libsearpc()
libevhtp = Libevhtp()
ccnet = CcnetServer()
seafile = SeafileServer()
libsearpc.clone()
libevhtp.clone()
ccnet.clone()
libsearpc.compile_and_install()
libevhtp.compile_and_install()
seafile.compile_and_install()
...
def main():
mkdirs(INSTALLDIR)
os.environ.update(make_build_env())
args = parse_args()
if on_github_actions() and not args.test_only:
fetch_and_build()
if on_github_actions():
dbs = ('sqlite3', 'mysql')
else:
dbs = ('sqlite3',)
for db in dbs:
start_and_test_with_db(db)
- get_branch_json_file方法:获取项目的分支json文件
- Get_project_branch方法:
- Project类:表示项目对象,有项目名和版本属性。根据这两个属性能够获取到项目的url,从而下载libsearpc和libevhtp到本地(根据url(),projectdir(),branch(),clone()等方法)。同时,使用compile_and_install()方法编译项目并进行安装
- 基于Linux环境,使用项目包包类的autogen.sh与confihure.sh脚本进行部署与配置,然后调用make命令进行安装
- Fetch_and_build方法:对libsearpc与libevhtp等进行下载与部署安装
/searpc/searpc.go
对unix的pipe传输使用searpc客户端协议(使用go语言)
import (
"bufio"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net"
)
// Client represents a connections to the RPC server.
type Client struct {
// path of the named pipe
pipePath string
// RPC service name
Service string
}
type request struct {
Service string `json:"service"`
Request string `json:"request"`
}
// Init initializes rpc client.
func Init(pipePath string, service string) *Client {
client := new(Client)
client.pipePath = pipePath
client.Service = service
return client
}
// Call calls the RPC function funcname with variadic parameters.
// The return value of the RPC function is return as interface{} type
// The true returned type can be int32, int64, string, struct (object), list of struct (objects) or JSON
func (c *Client) Call(funcname string, params ...interface{}) (interface{}, error) {
// TODO: use reflection to compose requests and parse results.
var unixAddr *net.UnixAddr
unixAddr, err := net.ResolveUnixAddr("unix", c.pipePath)
if err != nil {
err := fmt.Errorf("failed to resolve unix addr when calling rpc : %v", err)
return nil, err
}
conn, err := net.DialUnix("unix", nil, unixAddr)
if err != nil {
err := fmt.Errorf("failed to dial unix when calling rpc : %v", err)
return nil, err
}
defer conn.Close()
var req []interface{}
req = append(req, funcname)
req = append(req, params...)
jsonstr, err := json.Marshal(req)
if err != nil {
err := fmt.Errorf("failed to encode rpc call to json : %v", err)
return nil, err
}
reqHeader := new(request)
reqHeader.Service = c.Service
reqHeader.Request = string(jsonstr)
jsonstr, err = json.Marshal(reqHeader)
if err != nil {
err := fmt.Errorf("failed to convert object to json : %v", err)
return nil, err
}
header := make([]byte, 4)
binary.LittleEndian.PutUint32(header, uint32(len(jsonstr)))
_, err = conn.Write([]byte(header))
if err != nil {
err := fmt.Errorf("Failed to write rpc request header : %v", err)
return nil, err
}
_, err = conn.Write([]byte(jsonstr))
if err != nil {
err := fmt.Errorf("Failed to write rpc request body : %v", err)
return nil, err
}
reader := bufio.NewReader(conn)
buflen := make([]byte, 4)
_, err = io.ReadFull(reader, buflen)
if err != nil {
err := fmt.Errorf("failed to read response header from rpc server : %v", err)
return nil, err
}
retlen := binary.LittleEndian.Uint32(buflen)
msg := make([]byte, retlen)
_, err = io.ReadFull(reader, msg)
if err != nil {
err := fmt.Errorf("failed to read response body from rpc server : %v", err)
return nil, err
}
retlist := make(map[string]interface{})
err = json.Unmarshal(msg, &retlist)
if err != nil {
err := fmt.Errorf("failed to decode rpc response : %v", err)
return nil, err
}
if _, ok := retlist["err_code"]; ok {
err := fmt.Errorf("searpc server returned error : %v", retlist["err_msg"])
return nil, err
}
if _, ok := retlist["ret"]; ok {
ret := retlist["ret"]
return ret, nil
}
err = fmt.Errorf("No value returned")
return nil, err
}
- 结构体client用来代表一个与rpc服务器的链接,属性为pipe的路径与rpc服务的名称
- init方法用于初始化一个rpc客户端
- call方法是本文件的核心,通过本方法来调用rpc函数。
- 方法参数是需要被调用的远程函数名以及函数参数,其中参数适用interface{}类型。
- rpc函数的返回值作为interface{}类型被返回,其实际返回类型可以是整型、字符串,结构体对象、json等。
- 发送调用请求过程:首先获取到unix地址,并创建一个pipe连接;其次对rpc call进行编码;然后讲request对象转化为json,并添加rpc请求的header和body。
- 接受返回值过程:对rpc server调用函数后返回的结构读入并解码,然后返回到调用call方法的地方
/lib/rpc_table.py
定义需要生成的rpc函数,即返回类型以及形参列表
func_table = [
[ "int", [] ],
[ "int", ["int"] ],
[ "int", ["int", "int"] ],
[ "int", ["int", "string"] ],
...
]
/python/seafile/rpcclient.py
seafserver线程化rpc客户端类,定义了可能的rpc函数的返回类型以及参数列表;
通过pipe获取到其实现
from pysearpc import searpc_func, SearpcError, NamedPipeClient
class SeafServerThreadedRpcClient(NamedPipeClient):
def __init__(self, pipe_path):
NamedPipeClient.__init__(self, pipe_path, "seafserv-threaded-rpcserver")
@searpc_func("string", ["string", "string", "string", "string", "int"])
def seafile_create_repo(name, desc, owner_email, passwd, enc_version):
pass
create_repo = seafile_create_repo
@searpc_func("string", ["string", "string", "string", "string", "string", "string", "string", "int"])
def seafile_create_enc_repo(repo_id, name, desc, owner_email, magic, random_key, salt, enc_version):
pass
create_enc_repo = seafile_create_enc_repo
...
- @searpc_func:定义rpc函数
- namedpipeclient: 根据pipe_path获取到对应pipeClient从而进行对服务端函数的调用
/python/seaserv/api.py
定义http api,调用/python/seaserv/service.py中定义的服务;而这些服务通过/python/seafile/rpcclient.py文件去访
...
class SeafileAPI(object):
def __init__(self):
pass
# fileserver token
def get_fileserver_access_token(self, repo_id, obj_id, op, username, use_onetime=True):
"""Generate token for access file/dir in fileserver
op: the operation, can be 'view', 'download', 'download-dir', 'downloadblks',
'upload', 'update', 'upload-blks-api', 'upload-blks-aj',
'update-blks-api', 'update-blks-aj'
Return: the access token in string
"""
onetime = 1 if bool(use_onetime) else 0
return seafserv_threaded_rpc.web_get_access_token(repo_id, obj_id, op, username,
onetime)
def query_fileserver_access_token(self, token):
"""Get the WebAccess object
token: the access token in string
Return: the WebAccess object (lib/webaccess.vala)
"""
return seafserv_threaded_rpc.web_query_access_token(token)
...
class CcnetAPI(object):
def __init__(self):
pass
# user management
def add_emailuser(self, email, passwd, is_staff, is_active):
return ccnet_threaded_rpc.add_emailuser(email, passwd, is_staff, is_active)
def remove_emailuser(self, source, email):
"""
source can be 'DB' or 'LDAP'.
- 'DB': remove a user created in local database
- 'LDAP': remove a user imported from LDAP
"""
return ccnet_threaded_rpc.remove_emailuser(source, email)
...
- seafileAPI:用户访问的功能接口
- ccnetAPI: 系统管理、用户管理等功能接口
/common/rpc-service.c
对/include/seafile-rpc.h中定义的函数进行实现,并且通过pipe传输被/python/seafile/rpcclient.py中定义的python方法访问
/server/seaf-server.c
创建rpc服务端并注册函数(即/include/seafile-rpc.h中定义函数),从而能被pipe访问
结论
首先,run.py文件的作用是在部署项目时下载所需要的组件并对其进行部署安装,从而时seafile-server能够运行;
其次在seafile-server中,提供给客户端的函数由c语言在rpc-service.c文件中实现;并在seaf-server.c中rpc服务端中注册;
然后通过基于go语言的searpc.go文件使用 unix 管道传输实现 searpc 客户端协议;
在api.py以及service.py文件中定义了服务器提供的接口及服务,并通过rpcclient.py文件中定义的rpc客户端类通过unix管道传输访问searpc服务器端的函数实现



