您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何在其他课程中阅读Netty中的消息

如何在其他课程中阅读Netty中的消息

Netty框架设计为异步驱动。使用这种类比,它可以以最少的线程使用量处理大量连接。如果您要创建一个使用netty框架将调用分派到远程位置的api,则应该对调用使用相同的类比。

而不是让api直接返回值,而是使其返回aFuture<?>或aPromise<?>。在您的应用程序中有多种实现此系统的方法,最简单的方法是创建一个自定义处理程序,该处理程序将传入的请求映射到PromiseFIFO队列中的。

例如:

这在很大程度上基于这样的回答,我在过去的提交。

我们从out处理程序开始,该处理程序将请求映射到管道中的请求:

public class MyLastHandler extends SimpleInboundHandler<String> {
    private final SynchronousQueue<Promise<String>> queue;

    public MyLastHandler (SynchronousQueue<Promise<String>> queue) {
        super();
        this.queue = queue;
    }

    // The following is called messageReceived(ChannelHandlerContext, String) in 5.0.
    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) {
        this.queue.remove().setSuccss(msg); 
        // Or setFailure(Throwable)
    }
}

然后,我们需要一种将命令发送到远程服务器的方法

Channel channel = ....;
SynchronousQueue<Promise<String>> queue = ....;

public Future<String> sendCommandAsync(String command) {
    return sendCommandAsync(command, new DefaultPromise<>());
}

public Future<String> sendCommandAsync(String command, Promise<String> promise) {
    synchronized(channel) {
        queue.offer(promise);
        channel.write(command);
    }
    channel.flush();
}

完成方法之后,我们需要一种方法调用它:

sendCommandAsync("USER anonymous", 
    new DefaultPromise<>().addListener(
        (Future<String> f) -> {
            String response = f.get();
            if (response.startWidth("331")) {
                // do something
            }
            // etc
        }
    )
);

如果被叫者想使用我们的api作为阻塞调用,他也可以这样做:

String response = sendCommandAsync("USER anonymous").get();
if (response.startWidth("331")) {
    // do something
}
// etc

请注意,如果Thread状态被中断,则Future.get()可能抛出InterruptedException,这与套接字读取操作不同,套接字读取操作只能通过套接字上的某些交互来取消。在中,此异常应该不是问题FutureListener

dotnet 2022/1/1 18:28:04 有332人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶