前面几节我们实现了最基础的客户端调用服务端,这一节来学习一下通讯中的对象序列化。

 

 

 

为什么需要序列化

netty 底层都是基于 ByteBuf 进行通讯的。

 

前面我们通过编码器/解码器专门为计算的入参/出参进行处理,这样方便我们直接使用 pojo。

 

但是有一个问题,如果想把我们的项目抽象为框架,那就需要为所有的对象编写编码器/解码器。

 

显然,直接通过每一个对象写一对的方式是不现实的,而且用户如何使用,也是未知的。

 

序列化的方式

基于字节的实现,性能好,可读性不高。

 

基于字符串的实现,比如 json 序列化,可读性好,性能相对较差。

 

ps: 可以根据个人还好选择,相关序列化可参考下文,此处不做展开。

 

json 序列化框架简介[1]

 

实现思路

可以将我们的 Pojo 全部转化为 byte,然后 Byte 转换为 ByteBuf 即可。

 

反之亦然。

 

代码实现

maven

引入序列化包:

 

 

 

<dependency> 

    <groupId>com.github.houbb</groupId> 

    <artifactId>json</artifactId> 

    <version>0.1.1</version> 

</dependency> 

服务端

核心

服务端的代码可以大大简化:

 

serverBootstrap.group(workerGroup, bossGroup) 

    .channel(NioServerSocketChannel.class) 

    // 打印日志 

    .handler(new LoggingHandler(LogLevel.INFO)) 

    .childHandler(new ChannelInitializer<Channel>() { 

        @Override 

        protected void initChannel(Channel ch) throws Exception { 

            ch.pipeline() 

                    .addLast(new RpcServerHandler()); 

        } 

    }) 

    // 这个参数影响的是还没有被accept 取出的连接 

    .option(ChannelOption.SO_BACKLOG, 128) 

    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 

    .childOption(ChannelOption.SO_KEEPALIVE, true); 

这里只需要一个实现类即可。

 

RpcServerHandler

服务端的序列化/反序列化调整为直接使用 JsonBs 实现。

 

package com.github.houbb.rpc.server.handler; 

 

 

import com.github.houbb.json.bs.JsonBs; 

import com.github.houbb.log.integration.core.Log; 

import com.github.houbb.log.integration.core.LogFactory; 

import com.github.houbb.rpc.common.model.CalculateRequest; 

import com.github.houbb.rpc.common.model.CalculateResponse; 

import com.github.houbb.rpc.common.service.Calculator; 

import com.github.houbb.rpc.server.service.CalculatorService; 

 

 

import io.netty.buffer.ByteBuf; 

import io.netty.buffer.Unpooled; 

import io.netty.channel.ChannelHandlerContext; 

import io.netty.channel.SimpleChannelInboundHandler; 

 

 

/** 

 * @author binbin.hou 

 * @since 0.0.1 

 */ 

public class RpcServerHandler extends SimpleChannelInboundHandler { 

 

 

    private static final Log log = LogFactory.getLog(RpcServerHandler.class); 

 

 

    @Override 

    public void channelActive(ChannelHandlerContext ctx) throws Exception { 

        final String id = ctx.channel().id().asLongText(); 

        log.info("[Server] channel {} connected " + id); 

    } 

 

 

    @Override 

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 

        final String id = ctx.channel().id().asLongText(); 

 

 

        ByteBuf byteBuf = (ByteBuf)msg; 

        byte[] bytes = new byte[byteBuf.readableBytes()]; 

        byteBuf.readBytes(bytes); 

        CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class); 

        log.info("[Server] receive channel {} request: {} from ", id, request); 

 

 

        Calculator calculator = new CalculatorService(); 

        CalculateResponse response = calculator.sum(request); 

 

 

        // 回写到 client 端 

        byte[] responseBytes = JsonBs.serializeBytes(response); 

        ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes); 

        ctx.writeAndFlush(responseBuffer); 

        log.info("[Server] channel {} response {}", id, response); 

    } 

 

 

客户端

核心

客户端可以简化如下:

 

channelFuture = bootstrap.group(workerGroup) 

    .channel(NioSocketChannel.class) 

    .option(ChannelOption.SO_KEEPALIVE, true) 

    .handler(new ChannelInitializer<Channel>(){ 

        @Override 

        protected void initChannel(Channel ch) throws Exception { 

            channelHandler = new RpcClientHandler(); 

            ch.pipeline() 

                    .addLast(new LoggingHandler(LogLevel.INFO)) 

                    .addLast(channelHandler); 

        } 

    }) 

    .connect(RpcConstant.ADDRESS, port) 

    .syncUninterruptibly(); 

dawei

【声明】:商丘站长网内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。