栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

如何做一个国产数据库(七) 网络传输 java做订阅客户端

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何做一个国产数据库(七) 网络传输 java做订阅客户端

如何做一个国产数据库一
如何做一个国产数据库二
如何做一个国产数据库三
如何做一个国产数据库四
如何做一个国产数据库五
如何做一个国产数据库六

server端协议定义

再次强调一下我们的protocol 应用层的协议,其中协议第一个字节的前两位如下所示
//1字节 2位
//00 发布数据
//01 订阅数据
//10 心跳数据
//11 返回数据
所以服务端在接收到头部字节结束后,可以知道客户端时订阅客户端还是发布了

	int on_headers_complete(void* param) {
		//client_t* pclient = (client_t*)param;

		printf("the header len is %dn", pclient->recvlen);
		//printf("the id is %04xn", getid(pclient));
		client_t* cl = (client_t*)param;
		//得到头部字节
		char head = cl->head[0];
		char type = head >> 6;
		switch (type)
		{
		case 0x00://00 发布数据
		//放入发布列表
			cout << "publish" << endl;
			break;
		case 0x01://01 订阅数据
		    //放入订阅列表
			cout << "subscribe" << endl;
			break;
		case 0x02://10 心跳数据
			cout << "heartbeat" << endl;
			break;
		}
		return 0;
	}

接下去,就可以把订阅和发布客户端分别放到不同的队列里面去了,暂时不讲这些,先讲如何使用java做我们的订阅客户端,java最常用的就是netty,下面我们使用netty来做一个客户端。以下是tcpclient.java

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.ChannelPipeline;
public class TcpClient {
    
    // 要请求的服务器的ip地址
    private String ip;
    // 服务器的端口
    private int port;
    
    public TcpClient(String ip, int port){
        this.ip = ip;
        this.port = port;
    }
    
    // 请求端主题
    private void action() throws InterruptedException {
        
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        
        Bootstrap bs = new Bootstrap();
        
        bs.group(bossGroup)
          .channel(NioSocketChannel.class)
          .option(ChannelOption.SO_KEEPALIVE, true)
          .handler(new ChannelInitializer() {
              @Override
              protected void initChannel(SocketChannel socketChannel) throws Exception 
              {              
                    ChannelPipeline p = socketChannel.pipeline();
                    p.addLast(new MessageDecodeClient(255, 6, 1));
                    // 处理来自服务端的响应信息
                    socketChannel.pipeline().addLast(new TcpClientHandle());
              }
         });
        
        // 客户端开启
        ChannelFuture cf = bs.connect(ip, port).sync();
        byte[] respByte = new byte[6];
        //....以下为协议写入省略,请注意自行写出,若有问题,可以探讨
        
        // 发送客户端的请求
        cf.channel().writeAndFlush(Unpooled.copiedBuffer(respByte));

        
        // 等待直到连接中断
        cf.channel().closeFuture().sync();      
    }
            
    public static void main(String[] args) throws InterruptedException {
        new TcpClient("127.0.0.1", 8054).action();
    }
        
}

以下为解码函数

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldbasedframeDecoder;


public class MessageDecodeClient extends LengthFieldbasedframeDecoder {
 
  
    private static final int hsize = 6;
 
    public MessageDecodeClient(int maxframeLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxframeLength, lengthFieldOffset, lengthFieldLength);
    }
 
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in == null) {
            return null;
        }
 
        if (in.readableBytes() < hsize) {
            return null;
        }
 
        in.markReaderIndex();
 
        byte magic = in.readByte(); //头部字节0x69
        byte titlelen = in.readByte(); //四字节大端ID号码
        int dataLength = in.readIntLE();
        dataLength += titlelen;
        
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return null;
        }
        //钱波 :根据协议加上titlelen 和 数据len
        byte[] data = new byte[dataLength]; 
        in.readBytes(data);
 
        String body = new String(data, "UTF-8");
        return body;
    }
}

处理的TcpClientHandle .java

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


public class TcpClientHandle extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      //  empty
      //可做一些工作
    }

  
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
        ctx.writeAndFlush("客户端"+ InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! n");
        super.channelActive(ctx);
    }

      
      @Override
      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
          System.out.println("channelInactive");
      }
   
      
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          cause.printStackTrace();
      }
   

}

以上为主要的框架,读者可以根据基础自行写出,后面我也会补充写完整,希望能给您有所启发。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/785230.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号