分享

Netty客户端发送消息并同步获取结果

 wwq图书世界 2022-06-08 发布于山东

客户端发送消息并同步获取结果,实际上是违背Netty的设计原则的,可是有时候不得不这么作的话,那么建议进行以下的设计:css

好比咱们的具体用法以下:html  NettyRequest request = new NettyRequest();                 request.setRequestId(UUID.randomUUID().toString());                 request.setClassName(method.getDeclaringClass().getName());                 request.setMethodName(method.getName());                 request.setParameterTypes(method.getParameterTypes());                 request.setParameterValues(args);                 NettyMessage nettyMessage = new NettyMessage();                 nettyMessage.setType(MessageType.SERVICE_REQ.value());                 nettyMessage.setBody(request);                 if (serviceDiscovery != null) {                     serverAddress = serviceDiscovery.discover();                 }                 String[] array = serverAddress.split(":");                 String host = array[0];                 int port = Integer.parseInt(array[1]);                 NettyClient client = new NettyClient(host, port);                 NettyMessage nettyResponse = client.send(nettyMessage);                 if (nettyResponse != null) {                     return JSON.toJSONString(nettyResponse.getBody());                 } else {                     return null;                 }

先来看看NettyClient的写法 和 send方法的写法:bootstrappublic class NettyClient {     /**      * 日志记录      */     private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);     /**      * 客户端业务处理handler      */     private ClientHandler clientHandler = new ClientHandler();     /**      * 事件池      */     private EventLoopGroup group = new NioEventLoopGroup();     /**      * 启动器      */     private Bootstrap bootstrap = new Bootstrap();     /**      * 客户端通道      */     private Channel clientChannel;     /**      * 客户端链接      * @param host      * @param port      * @throws InterruptedException      */     public NettyClient(String host, int port) throws InterruptedException {         bootstrap.group(group)                 .channel(NioSocketChannel.class)                 .option(ChannelOption.TCP_NODELAY, true)                 .handler(new ChannelInitializer() {                     @Override                     protected void initChannel(SocketChannel channel) throws Exception {                         channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 5, 12));                         channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4));                         channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());                         channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());                         channel.pipeline().addLast("clientHandler", clientHandler);                         channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());                     }                 });         //发起同步链接操做         ChannelFuture channelFuture = bootstrap.connect(host, port);         //注册链接事件         channelFuture.addListener((ChannelFutureListener)future -> {             //若是链接成功             if (future.isSuccess()) {                 logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已链接...");                 clientChannel = channelFuture.channel();             }             //若是链接失败,尝试从新链接             else{                 logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]链接失败,从新链接中...");                 future.channel().close();                 bootstrap.connect(host, port);             }         });         //注册关闭事件         channelFuture.channel().closeFuture().addListener(cfl -> {             close();             logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");         });     }     /**      * 客户端关闭      */     private void close() {         //关闭客户端套接字         if(clientChannel!=null){             clientChannel.close();         }         //关闭客户端线程组         if (group != null) {             group.shutdownGracefully();         }     }     /**      * 客户端发送消息      * @param message      * @return      * @throws InterruptedException      * @throws ExecutionException      */     public NettyMessage send(NettyMessage message) throws InterruptedException, ExecutionException {         ChannelPromise promise = clientHandler.sendMessage(message);         promise.await(3, TimeUnit.SECONDS);         return clientHandler.getResponse();     } }

能够看出,咱们使用了clientHandler来进行消息发送行为,经过promise阻塞来同步获取返回结果,接下来看看sendMessage的写法:promisepublic class ClientHandler extends ChannelInboundHandlerAdapter {     private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);     private ChannelHandlerContext ctx;     private ChannelPromise promise;     private NettyMessage response;     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {         super.channelActive(ctx);         this.ctx = ctx;     }     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         NettyMessage message = (NettyMessage) msg;         if (message != null && message.getType() == MessageType.SERVICE_RESP.value()) {             response = message;             promise.setSuccess();         } else {             ctx.fireChannelRead(msg);         }     }     public synchronized ChannelPromise sendMessage(Object message) {         while (ctx == null) {             try {                 TimeUnit.MILLISECONDS.sleep(1);                 //logger.error("等待ChannelHandlerContext实例化");             } catch (InterruptedException e) {                 logger.error("等待ChannelHandlerContext实例化过程当中出错",e);             }         }         promise = ctx.newPromise();         ctx.writeAndFlush(message);         return promise;     }     public NettyMessage getResponse(){         return response;     } }

能够看到,在利用ChannelHanderContext进行发送消息前,咱们先建立了一个promise并返回给send方法,那么send方法此时就会阻塞等待;当咱们收到服务端消息后,promise.setSuccess就会解除send方法的等待行为,这样咱们就能获取结果了。dom

此法针对真正须要同步等待获取结果的场景,如非必要,仍是建议利用future来改造。ide

benchmark测试代表,此种同步获取结果的行为,表现挺稳定的,可是ops 在 150 左右, 真是性能太差了。高性能场合禁用此法。oop

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多