栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Go如何并发(三)广播

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Go如何并发(三)广播

服务器实现用户 上线&消息 广播
  • 数据结构
  • 流程
  • 代码

go: 用户上线广播,消息广播,消息回馈,查询在线用户,修改用户名


数据结构
type Server struct{
	Ip        string
	Port      int

	onlineMap map[string]*User
	mapLock   sync.RWMutex

	Message   chan string
}
type User struct{
	Name    string
	Addr    string
	channel chan string
	conn    net.Conn
}

接口:

func NewServer(ip string,port int) *Server {
	server := &Server{
		Ip:        ip,
		Port:      port,
		OnlineMap: make(map[string]*User),
		Message:   make(chan string),
	}
	return server
}
func NewUser(conn net.Conn) *User {
	userAddr := conn.RemoteAddr().String()

	user := &User{
		Name:userAddr,
		Addr:userAddr,
		channel:make(chan string),
		conn:conn,
	}

	go user.ListenMessage()

	return user
}
流程

server建立Listen()后

持续监听Message管道,管道里有消息就读出来,发给OnlineMap里的每个人

新连接加入的操作为:

创建一个user
这个user的channel开始监听
改onlineMap
广播

Start(){
	Listen()

	go ListenMessage(){
		for{
			从Message管道里读出来的消息
			发给map里的每个人
		}
	}

	for{
		Accept()  //连接上了客户端
		
		go Handle(){
			创建user
			go userListen
			改map
			broadcast(){
				sendmsg = userAddr + msg
				把sendMsg送进Message
			}
			
			go func (){
				conn.Read()
				broadcast()
			}
		}
		
	}
	
}

代码
package main

import(
	"fmt"
	"net"
	"sync"
)

type Server struct{
	Ip        string
	Port      int

	onlineMap map[string]*User
	mapLock   sync.RWMutex

	Message   chan string
}

func NewServer(ip string,port int) *Server {
	server := &Server{
		Ip:        ip,
		Port:      port,
		OnlineMap: make(map[string]*User),
		Message:   make(chan string),
	}
	return server
}

func (this *Server) ListenMessage (){
	for{
		msg := <- this.Message

		this.mapLock.Lock()
		for _,client := range this.OnlineMap{
			client.channel <- msg
		}
		this.mapLock.Unlock()
	}
}

func (this *Server) Broadcast(user *User, msg string){
	sendMsg := "[" + user.Addr + "]" + " " + user.Name + ":" + msg
	this.Message <- sendMsg
}

func (this *Server)Handle(conn net.Conn)  {
	user := NewUser(conn)
	fmt.Println("One new client connect SUCCESS!")

	this.mapLock.Lock()
	this.OnlineMap[user.Name] = user
	this.mapLock.Unlock()

	this.Broadcast(user," online SUCCESS!")
	
	go func ()  {
		buf := make([]byte, 4096)
		for{
			n,err := conn.Read(buf)
			if n == 0{
				this.Broadcast(user,"offline.")
				return
			}
			if err != nil && err != io.EOF{
				fmt.Println("conn.Read error,error is ",err)
				return
			}

			msg := string(buf[0:n-1])
			this.Broadcast(user,msg)
		}
	}()
	
	select{}
}

func (this *Server)Start()  {
	listener ,err := net.Listen("tcp",fmt.Sprintf("%s:%d",this.Ip,this.Port))
	if err != nil{
		fmt.Println("net.Listen error,error is ",err)
		return
	}
	fmt.Println("tcp connect SUCCESS! ")
	
	defer listener.Close()

	go this.ListenMessage()

	for{
		conn,err := listener.Accept()
		if err != nil{
			fmt.Println("Accept error,error is ",err)
			continue
		}

		go this.Handle(conn)
	}

}
package main

import(
	"net"
)

type User struct{
	Name    string
	Addr    string
	channel chan string
	conn    net.Conn
}

func NewUser(conn net.Conn) *User {
	userAddr := conn.RemoteAddr().String()

	user := &User{
		Name:userAddr,
		Addr:userAddr,
		channel:make(chan string),
		conn:conn,
	}

	go user.ListenMessage()

	return user
}

func (this *User) ListenMessage()  {
	for{
		msg := <- this.channel
		this.conn.Write([]byte(msg + "n"))
	}
}


把业务封装进User:

type User struct{
	Name    string
	Addr    string
	channel chan string
	conn    net.Conn

	server *Server
}

func NewUser(conn net.Conn, server *Server) *User {
	userAddr := conn.RemoteAddr().String()

	user := &User{
		Name:    userAddr,
		Addr:    userAddr,
		channel: make(chan string),
		conn:    conn,

		server:  server,
	}

	go user.ListenMessage()

	return user
}

func (this *User)Online()  {
	this.server.mapLock.Lock()
	this.server.OnlineMap[this.Name] = this
	this.server.mapLock.Unlock()

	this.server.Broadcast(this," online.")
}

func (this *User)Offline()  {
	this.server.mapLock.Lock()
	delete(this.server.OnlineMap, this.Name)
	this.server.mapLock.Unlock()

	this.server.Broadcast(this," offline.")
}

func (this *User)SendMessage(msg string)  {
	this.conn.Write([]byte(msg + "n"))
}

func (this *User)DoMessage(msg string)  {
	if msg == "who"{
		this.server.mapLock.Lock()

		for _,user := range this.server.OnlineMap{
			onlineMsg := "[" + user.Addr + "]" + user.Name + " is online"
			this.SendMessage(onlineMsg)
		}

		this.server.mapLock.Unlock()
	} else if msg[0:7] == "rename "{
		newName := strings.Split(msg, " ")[1]
		_, ok := this.server.OnlineMap[newName]
		if ok {
			this.SendMessage("This name has been used.")
		} else {
			this.server.mapLock.Lock()
			delete(this.server.OnlineMap, this.Name)
			this.server.OnlineMap[newName] = this
			this.server.mapLock.Unlock()

			this.Name = newName
			this.SendMessage("You have update your name : " + this.Name + "n")
		}
	} else {
		this.server.Broadcast(this,msg)
	}
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581996.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号