scala实现Netty通信

在学习spark源码的时候看到spark在1.6之后底层的通信框架变成了akka和netty两种方式,默认的是用netty根据源码的思路用scala写了一个Demo级别的netty通信

package com.spark.netty
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder}

/**
  * Created by root on 2016/11/18.
  */
class NettyServer {
  def bind(host: String, port: Int): Unit = {
    //配置服务端线程池组
    //用于服务器接收客户端连接
    val bossGroup = new NioEventLoopGroup()
    //用户进行SocketChannel的网络读写
    val workerGroup = new NioEventLoopGroup()

    try {
      //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
      val bootstrap = new ServerBootstrap()
      //将两个NIO线程组作为参数传入到ServerBootstrap
      bootstrap.group(bossGroup, workerGroup)
        //创建NioServerSocketChannel
        .channel(classOf[NioServerSocketChannel])
        //绑定I/O事件处理类
        .childHandler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(
            //            new ObjectEncoder,
            //            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
            new ServerHandler
          )
        }
      })
      //绑定端口,调用sync方法等待绑定操作完成
      val channelFuture = bootstrap.bind(host, port).sync()
      //等待服务关闭
      channelFuture.channel().closeFuture().sync()
    } finally {
      //优雅的退出,释放线程池资源
      bossGroup.shutdownGracefully()
      workerGroup.shutdownGracefully()
    }
  }
}

object NettyServer {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val server = new NettyServer
    server.bind(host, port)
  }
}
package com.spark.netty

import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel}
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}

/**
  * Created by root on 2016/11/18.
  */

class NettyClient {
  def connect(host: String, port: Int): Unit = {
    //创建客户端NIO线程组
    val eventGroup = new NioEventLoopGroup
    //创建客户端辅助启动类
    val bootstrap = new Bootstrap
    try {
      //将NIO线程组传入到Bootstrap
      bootstrap.group(eventGroup)
        //创建NioSocketChannel
        .channel(classOf[NioSocketChannel])
        //绑定I/O事件处理类
        .handler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(
//            new ObjectEncoder,
//            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
            new ClientHandler
          )
        }
      })
      //发起异步连接操作
      val channelFuture = bootstrap.connect(host, port).sync()
      //等待服务关闭
      channelFuture.channel().closeFuture().sync()
    } finally {
      //优雅的退出,释放线程池资源
      eventGroup.shutdownGracefully()
    }
  }
}

object NettyClient {
  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    val client = new NettyClient
    client.connect(host, port)
  }
}
package com.spark.netty

import io.netty.buffer.{Unpooled, ByteBuf}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}

/**
  * Created by root on 2016/11/18.
  */
class ServerHandler extends ChannelInboundHandlerAdapter {
  /**
    * 有客户端建立连接后调用
    */
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("channelActive invoked")
  }

  /**
    * 接受客户端发送来的消息
    */
  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    println("channelRead invoked")
    val byteBuf = msg.asInstanceOf[ByteBuf]
    val bytes = new Array[Byte](byteBuf.readableBytes())
    byteBuf.readBytes(bytes)
    val message = new String(bytes, "UTF-8")
    println(message)
    val back = "good boy!"
    val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))
    println(msg)
    ctx.write(resp)
  }

  /**
    * 将消息对列中的数据写入到SocketChanne并发送给对方
    */
  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
    println("channekReadComplete invoked")
    ctx.flush()
  }


}
package com.spark.netty

import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter}

/**
  * Created by root on 2016/11/18.
  */
class ClientHandler extends ChannelInboundHandlerAdapter {
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("channelActive")
    val content = "hello server"
    ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8")))
    //发送case class 不在发送字符串了,封装一个字符串
    //    ctx.writeAndFlush(RegisterMsg("hello server"))
  }

  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    println("channelRead")
    val byteBuf = msg.asInstanceOf[ByteBuf]
    val bytes = new Array[Byte](byteBuf.readableBytes())
    byteBuf.readBytes(bytes)
    val message = new String(bytes, "UTF-8")
    println(message)
  }

  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
    println("channeReadComplete")
    ctx.flush()
  }
//发送异常时关闭
  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
    println("exceptionCaught")
    ctx.close()
  }

}
package com.spark.netty

/**
  * Created by root on 2016/11/18.
  */
case class RegisterMsg(content: String) extends Serializable

先启动NettyServer,然后在启动NettyClient.打印结果

原文地址:https://www.cnblogs.com/itboys/p/6077640.html