Pengantar Netty

1. Perkenalan

Pada artikel ini, kita akan melihat Netty - kerangka kerja aplikasi jaringan berbasis peristiwa asinkron.

Tujuan utama Netty adalah membangun server protokol berkinerja tinggi berdasarkan NIO (atau mungkin NIO.2) dengan pemisahan dan penggandengan yang longgar dari komponen jaringan dan logika bisnis. Mungkin menerapkan protokol yang dikenal luas, seperti HTTP, atau protokol khusus Anda sendiri.

2. Konsep Inti

Netty adalah kerangka kerja non-pemblokiran. Hal ini menyebabkan throughput yang tinggi dibandingkan dengan memblokir IO. Memahami IO non-pemblokiran sangat penting untuk memahami komponen inti Netty dan hubungannya.

2.1. Saluran

Channel adalah basis dari Java NIO. Ini mewakili koneksi terbuka yang mampu menjalankan operasi IO seperti membaca dan menulis.

2.2. Masa depan

Setiap operasi IO pada Saluran di Netty tidak memblokir.

Ini berarti bahwa setiap operasi dikembalikan segera setelah panggilan. Ada antarmuka Future dalam pustaka Java standar, tetapi tidak nyaman untuk tujuan Netty - kita hanya dapat bertanya pada Future tentang penyelesaian operasi atau memblokir utas saat ini hingga operasi selesai.

Itulah mengapa Netty memiliki antarmuka ChannelFuture sendiri . Kita bisa meneruskan callback ke ChannelFuture yang akan dipanggil setelah operasi selesai.

2.3. Acara dan Penangan

Netty menggunakan paradigma aplikasi yang digerakkan oleh peristiwa, sehingga jalur pemrosesan data adalah rangkaian peristiwa yang melalui penangan. Peristiwa dan penangan dapat dikaitkan dengan aliran data masuk dan keluar. Acara masuk dapat berupa berikut ini:

  • Aktivasi dan penonaktifan saluran
  • Baca acara operasi
  • Peristiwa pengecualian
  • Acara pengguna

Acara keluar lebih sederhana dan, umumnya, terkait dengan membuka / menutup koneksi dan menulis / membilas data.

Aplikasi Netty terdiri dari beberapa peristiwa jaringan dan logika aplikasi serta penangannya. Antarmuka dasar untuk penangan kejadian saluran adalah ChannelHandler dan leluhurnya ChannelOutboundHandler dan ChannelInboundHandler .

Netty menyediakan hierarki implementasi ChannelHandler yang sangat besar. Perlu diperhatikan adaptor yang hanya merupakan implementasi kosong, misalnya ChannelInboundHandlerAdapter dan ChannelOutboundHandlerAdapter . Kami dapat memperluas adaptor ini saat kami hanya perlu memproses sebagian dari semua peristiwa.

Selain itu, terdapat banyak implementasi protokol khusus seperti HTTP, misalnya HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. Akan sangat baik untuk berkenalan dengan mereka di Netty's Javadoc.

2.4. Encoder dan Decoder

Saat kami bekerja dengan protokol jaringan, kami perlu melakukan serialisasi dan deserialisasi data. Untuk tujuan ini, Netty memperkenalkan ekstensi khusus ChannelInboundHandler untuk dekoder yang mampu mendekode data yang masuk. Kelas dasar dari kebanyakan decoder adalah ByteToMessageDecoder.

Untuk mengenkode data keluar, Netty memiliki ekstensi ChannelOutboundHandler yang disebut encoder. MessageToByteEncoder adalah basis untuk sebagian besar implementasi encoder . Kita dapat mengonversi pesan dari urutan byte ke objek Java dan sebaliknya dengan encoder dan decoder.

3. Contoh Aplikasi Server

Mari buat proyek yang mewakili server protokol sederhana yang menerima permintaan, melakukan kalkulasi, dan mengirimkan respons.

3.1. Dependensi

Pertama-tama, kita perlu menyediakan dependensi Netty di pom.xml kita :

 io.netty netty-all 4.1.10.Final 

Kami dapat menemukan versi terbaru di Maven Central.

3.2. Model data

Kelas data permintaan akan memiliki struktur berikut:

public class RequestData { private int intValue; private String stringValue; // standard getters and setters }

Mari kita asumsikan bahwa server menerima permintaan dan mengembalikan intValue dikalikan dengan 2. Responsnya akan memiliki satu nilai int:

public class ResponseData { private int intValue; // standard getters and setters }

3.3. Minta Decoder

Sekarang kita perlu membuat encoder dan decoder untuk pesan protokol kita.

Perlu dicatat bahwa Netty bekerja dengan buffer penerima soket , yang direpresentasikan bukan sebagai antrian tetapi hanya sebagai sekumpulan byte. Ini berarti bahwa penangan masuk kita bisa dipanggil ketika pesan lengkap tidak diterima oleh server.

Kami harus memastikan bahwa kami telah menerima pesan lengkap sebelum memproses dan ada banyak cara untuk melakukannya.

Pertama-tama, kita dapat membuat ByteBuf sementara dan menambahkan semua byte masuk sampai kita mendapatkan jumlah byte yang diperlukan:

public class SimpleProcessingHandler extends ChannelInboundHandlerAdapter { private ByteBuf tmp; @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("Handler added"); tmp = ctx.alloc().buffer(4); } @Override public void handlerRemoved(ChannelHandlerContext ctx) { System.out.println("Handler removed"); tmp.release(); tmp = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; tmp.writeBytes(m); m.release(); if (tmp.readableBytes() >= 4) { // request processing RequestData requestData = new RequestData(); requestData.setIntValue(tmp.readInt()); ResponseData responseData = new ResponseData(); responseData.setIntValue(requestData.getIntValue() * 2); ChannelFuture future = ctx.writeAndFlush(responseData); future.addListener(ChannelFutureListener.CLOSE); } } }

Contoh yang ditunjukkan di atas terlihat agak aneh, tetapi membantu kita memahami cara kerja Netty. Setiap metode penangan kita dipanggil saat kejadian yang sesuai terjadi. Jadi kami menginisialisasi buffer saat penangan ditambahkan, mengisinya dengan data saat menerima byte baru dan mulai memprosesnya saat kami mendapatkan cukup data.

We deliberately did not use a stringValue — decoding in such a manner would be unnecessarily complex. That's why Netty provides useful decoder classes which are implementations of ChannelInboundHandler: ByteToMessageDecoder and ReplayingDecoder.

As we noted above we can create a channel processing pipeline with Netty. So we can put our decoder as the first handler and the processing logic handler can come after it.

The decoder for RequestData is shown next:

public class RequestDecoder extends ReplayingDecoder { private final Charset charset = Charset.forName("UTF-8"); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { RequestData data = new RequestData(); data.setIntValue(in.readInt()); int strLen = in.readInt(); data.setStringValue( in.readCharSequence(strLen, charset).toString()); out.add(data); } }

An idea of this decoder is pretty simple. It uses an implementation of ByteBuf which throws an exception when there is not enough data in the buffer for the reading operation.

When the exception is caught the buffer is rewound to the beginning and the decoder waits for a new portion of data. Decoding stops when the out list is not empty after decode execution.

3.4. Response Encoder

Besides decoding the RequestData we need to encode the message. This operation is simpler because we have the full message data when the write operation occurs.

We can write data to Channel in our main handler or we can separate the logic and create a handler extending MessageToByteEncoder which will catch the write ResponseData operation:

public class ResponseDataEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, ResponseData msg, ByteBuf out) throws Exception { out.writeInt(msg.getIntValue()); } }

3.5. Request Processing

Since we carried out the decoding and encoding in separate handlers we need to change our ProcessingHandler:

public class ProcessingHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RequestData requestData = (RequestData) msg; ResponseData responseData = new ResponseData(); responseData.setIntValue(requestData.getIntValue() * 2); ChannelFuture future = ctx.writeAndFlush(responseData); future.addListener(ChannelFutureListener.CLOSE); System.out.println(requestData); } }

3.6. Server Bootstrap

Now let's put it all together and run our server:

public class NettyServer { private int port; // constructor public static void main(String[] args) throws Exception { int port = args.length > 0 ? Integer.parseInt(args[0]); : 8080; new NettyServer(port).run(); } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }

The details of the classes used in the above server bootstrap example can be found in their Javadoc. The most interesting part is this line:

ch.pipeline().addLast( new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler());

Here we define inbound and outbound handlers that will process requests and output in the correct order.

4. Client Application

The client should perform reverse encoding and decoding, so we need to have a RequestDataEncoder and ResponseDataDecoder:

public class RequestDataEncoder extends MessageToByteEncoder { private final Charset charset = Charset.forName("UTF-8"); @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, ByteBuf out) throws Exception { out.writeInt(msg.getIntValue()); out.writeInt(msg.getStringValue().length()); out.writeCharSequence(msg.getStringValue(), charset); } }
public class ResponseDataDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { ResponseData data = new ResponseData(); data.setIntValue(in.readInt()); out.add(data); } }

Also, we need to define a ClientHandler which will send the request and receive the response from server:

public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { RequestData msg = new RequestData(); msg.setIntValue(123); msg.setStringValue( "all work and no play makes jack a dull boy"); ChannelFuture future = ctx.writeAndFlush(msg); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println((ResponseData)msg); ctx.close(); } }

Now let's bootstrap the client:

public class NettyClient { public static void main(String[] args) throws Exception { String host = "localhost"; int port = 8080; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }

Seperti yang bisa kita lihat, ada banyak detail yang sama dengan bootstrap server.

Sekarang kita dapat menjalankan metode utama klien dan melihat keluaran konsol. Seperti yang diharapkan, kami mendapatkan ResponseData dengan intValue sama dengan 246.

5. Kesimpulan

Di artikel ini, kami memiliki pengenalan singkat tentang Netty. Kami menunjukkan komponen intinya seperti Channel dan ChannelHandler . Selain itu, kami telah membuat server protokol non-pemblokiran sederhana dan klien untuk itu.

Seperti biasa, semua contoh kode tersedia di GitHub.