栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Go语言实战】 (11) go-micro微服务 实现简单备忘录 (下) | 备忘录模块

【Go语言实战】 (11) go-micro微服务 实现简单备忘录 (下) | 备忘录模块

文章目录
  • 写在前面
  • 1. RabbitMQ创建备忘录
    • 1.1 导入配置
    • 1.2 proto
      • 1.2.1 taskModels.proto
      • 1.2.2 taskService.proto
    • 1.3 写入数据
    • 1.4 读取数据
  • 2. 备忘录其他操作
    • 2.1 获取备忘录列表
    • 2.2 获取备忘录详情
    • 2.3 更新备忘录
    • 2.4 注册到etcd中
  • 3. 接入网关
    • 3.1 接入路由
    • 3.2 编写接口(创建备忘录为例子)
    • 3.3 测试

写在前面

这一章节我们继续前一章的内容,将备忘录模块完善,我们将使用RabbitMQ作为消息队列去创建备忘录

1. RabbitMQ创建备忘录 1.1 导入配置

导入配置

[rabbitmq]
RabbitMQ = amqp
RabbitMQUser = guest
RabbitMQPassWord = guest
RabbitMQHost = localhost
RabbitMQPort = 5672

加载配置

func LoadRabbitMQ(file *ini.File) {
	RabbitMQ = file.Section("rabbitmq").Key("RabbitMQ").String()
	RabbitMQUser = file.Section("rabbitmq").Key("RabbitMQUser").String()
	RabbitMQPassWord = file.Section("rabbitmq").Key("RabbitMQPassWord").String()
	RabbitMQHost = file.Section("rabbitmq").Key("RabbitMQHost").String()
	RabbitMQPort = file.Section("rabbitmq").Key("RabbitMQPort").String()
}

连接RabbitMQ

// MQ rabbitMQ链接单例
var MQ *amqp.Connection

// 初始化rabbitMQ链接
func RabbitMQ(connString string) {
	conn, err := amqp.Dial(connString)
	if err != nil {
		panic(err)
	}
	MQ = conn
}
1.2 proto
  • task/services/protos
1.2.1 taskModels.proto

定义了task的proto模型

syntax="proto3";
package services;
option go_package ="./;protos";

message TaskModel{
    //@inject_tag: json:"Id" form:"Id"
    uint64 Id = 1;
    //@inject_tag: json:"Uid" form:"Uid"
    uint64 Uid = 2;
    //@inject_tag: json:"Title" form:"Title"
    string Title = 3;
    //@inject_tag: json:"Content" form:"Content"
    string Content = 4;
    //@inject_tag: json:"StartTime" form:"StartTime"
    int64 StartTime = 5;
    //@inject_tag: json:"EndTime" form:"EndTime"
    int64 EndTime = 6;
    //@inject_tag: json:"Status" form:"Status"
    int64 Status = 7;
    //@inject_tag: json:"CreateTime" form:"CreateTime"
    int64 CreateTime = 8;
    //@inject_tag: json:"UpdateTime" form:"UpdateTime"
    int64 UpdateTime = 9;
}

执行protoc生成pb文件

protoc --proto_path=. --micro_out=. --go_out=. taskModels.proto
1.2.2 taskService.proto

定义了taskRequest,task的请求参数。
定义了TaskListResponse,task列表的响应参数。
定义了TaskDetailResponse,task列表的详细信息。
定义了TaskService,都是定义一些增删改查的服务。

syntax="proto3";
package services;
import "taskModels.proto";
option go_package = "./;protos";

message TaskRequest{
    //@inject_tag: json:"Id" form:"Id"
    uint64 Id = 1;
    //@inject_tag: json:"Uid" form:"Uid"
    uint64 Uid = 2;
    //@inject_tag: json:"Title" form:"Title"
    string Title = 3;
    //@inject_tag: json:"Content" form:"Content"
    string Content = 4;
    //@inject_tag: json:"StartTime" form:"StartTime"
    int64 StartTime = 5;
    //@inject_tag: json:"EndTime" form:"EndTime"
    int64 EndTime = 6;
    //@inject_tag: json:"Status" form:"Status"
    int64 Status = 7;
    // @inject_tag: json:"Start" form:"Start" uri:"Start"
    uint32 Start = 8;
    // @inject_tag: json:"Limit" form:"Limit" uri:"Limit"
    uint32 Limit = 9;
}

message TaskListResponse{
  repeated TaskModel TaskList=1;
  // @inject_tag: json:"Count"
  uint32 Count=2;
}

message TaskDetailResponse{
  TaskModel TaskDetail=1;
}

service TaskService{
  rpc CreateTask(TaskRequest) returns(TaskDetailResponse);
  rpc GetTasksList(TaskRequest) returns(TaskListResponse);
  rpc GetTask(TaskRequest) returns(TaskDetailResponse);
  rpc UpdateTask(TaskRequest) returns(TaskDetailResponse);
  rpc DeleteTask(TaskRequest) returns(TaskDetailResponse);
}

执行protoc生成pb文件

protoc --proto_path=. --micro_out=. --go_out=. taskService.proto

1.3 写入数据
  • task/core/taskService.go

我们在这个go文件中将数据写入RabbitMQ当中。

  • 连接通道
	ch, err := model.MQ.Channel()
	if err != nil {
		err = errors.New("rabbitMQ err:" + err.Error())
		return err
	}
  • 声明通道队列
	q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
	if err != nil {
		err = errors.New("rabbitMQ err:" + err.Error())
		return err
	}
  • 将请求的参数序列化,发布到队列中
	body, _ := json.Marshal(req)
	err = ch.Publish("", q.Name, false, false, amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "application/json",
		Body:         body,
	})
1.4 读取数据
  • mq-server/services/task.go

从RabbitMQ中接收数据信息再写入数据库中

  • 打开Channel
ch, err := model.MQ.Channel()
  • 从task_queue通道中获取消息
	q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

name:队列名称;
durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;
autoDelete:是否自动删除;
noWait:是否非阻塞,true为是,不等待RMQ返回信息;
args:参数,传nil即可;
exclusive:是否设置排他

消息ACK保证了消息不会丢失,但是当rabbitMQ Server停止(不是consumer 挂掉)的时候,我们的所有消息都会丢失。针对这种情况,我们先确保消息队列的持久化,设置消息队列的durable选项为true

  • 公平分派消息
	err = ch.Qos(1, 0, false)
	if err != nil {
		panic(err)
	}

设置Qos,设置预取大小prefetch,当prefetch=1时,表示在没收到consumer的ACK消息之前,只会为其consumer分派一个消息。

  • 读出数据
	msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
  • 从通道中读出数据

将通道的信息,反系列化,然后在数据库中创建。

	go func() {
		for d := range msgs {
			var p model.Task
			err := json.Unmarshal(d.Body, &p)
			if err != nil {
				panic(err)
			}
			fmt.Println("d.Body",string(d.Body))
			model.DB.Create(&p)
			log.Printf("Done")
			_ = d.Ack(false) // 确认消息,必须为false
		}
	}()
2. 备忘录其他操作

构造一个服务

type TaskService struct {

}
2.1 获取备忘录列表

传入的参数:上下文信息,请求参数,响应参数。

func (*TaskService) GetTasksList(ctx context.Context, req *services.TaskRequest, res *services.TaskListResponse) error {
	if req.Limit == 0 {
		req.Limit = 6
	}
	//在数据库查找值
	var productData []model.Task
	var count uint32
	err := model.DB.Offset(req.Start).Limit(req.Limit).Where("uid=?", req.Uid).Find(&productData).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	err = model.DB.Model(&model.Task{}).Where("uid=?", req.Uid).Count(&count).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}

	//序类化备忘录列表
	var taskRes []*services.TaskModel
	for _, item := range productData {
		taskRes = append(taskRes, BuildTask(item))
	}
	//序列化后的结果赋给response
	res.TaskList = taskRes
	res.Count = count
	return nil
}
2.2 获取备忘录详情
func (*TaskService) GetTask(ctx context.Context, req *services.TaskRequest, res *services.TaskDetailResponse) error {
	//在数据库查找值
	productData := model.Task{}
	err := model.DB.First(&productData, req.Id).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	//序类化商品
	productRes := BuildTask(productData)
	//序列化后的结果赋给response
	res.TaskDetail = productRes
	return nil
}
2.3 更新备忘录
func (*TaskService) UpdateTask(ctx context.Context, req *services.TaskRequest, res *services.TaskDetailResponse) error {
	//在数据库查找值
	taskData := model.Task{}
	err := model.DB.Model(model.Task{}).Where("id = ? AND uid = ?",req.Id,req.Uid).First(&taskData).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	//将要更新的数据赋值给结构体
	taskData.Title = req.Title
	taskData.Status = int(req.Status)
	taskData.Content = req.Content
	//update
	err = model.DB.Save(&taskData).Error
	if err != nil {
		err = errors.New("mysql err:" + err.Error())
		return err
	}
	//序列化后的结果赋给response
	res.TaskDetail = BuildTask(taskData)
	return nil
}
2.4 注册到etcd中
  • 注册etcd
	etcdReg := etcd.NewRegistry(
		registry.Addrs("127.0.0.1:2379"),
	)
  • 得到微服务实例
	// 1. 得到微服务实例
	microService := micro.NewService(
		micro.Name("rpcTaskService"), // 设置微服务名字,用来访问的
		micro.Address("127.0.0.1:8083"),
		micro.Registry(etcdReg),
	)
  • 初始化
	microService.Init()
  • 服务注册
    将用户服务注册到etcd中
	_ = services.RegisterTaskServiceHandler(microService.Server(), new(core.TaskService))
  • 启动微服务
	_ = microService.Run()

查看etcd中http://localhost:8080/etcdkeeper/是否有该模块的注册信息

3. 接入网关 3.1 接入路由
  • api-gateway/weblib/handlers
	//备忘录服务
	authed.GET("tasks", handlers.GetTaskList)
	authed.POST("task", handlers.CreateTask)
	authed.GET("task/:id", handlers.GetTaskDetail)
	authed.DELETE("task/:id", handlers.DeleteTask)
	authed.PUT("task/:id", handlers.UpdateTask)
3.2 编写接口(创建备忘录为例子)

注意这是一个多用户的备忘录,所以我们要确保的是创建到该用户的管理下的备忘录中。

所以我们就需要用到用户的id,所以就从Authorization中取出来。

func CreateTask(ginCtx *gin.Context) {
	var taskReq services.TaskRequest
	PanicIfTaskError(ginCtx.Bind(&taskReq))
	//从gin.keys取出服务实例
	claim,_ := util.ParseToken(ginCtx.GetHeader("Authorization"))
	taskReq.Uid = uint64(claim.Id)
	taskService := ginCtx.Keys["taskService"].(services.TaskService)
	taskRes, err := taskService.CreateTask(context.Background(), &taskReq)
	PanicIfTaskError(err)
	ginCtx.JSON(200, gin.H{"data": taskRes.TaskDetail})
}
3.3 测试
  • 创建备忘录


  • 展示用户备忘录

  • 修改备忘录


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/695777.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号