Node.js中实现RPC通信的新方法:ipc-rpc

需积分: 9 0 下载量 63 浏览量 更新于2024-11-17 收藏 11KB ZIP 举报
资源摘要信息:"ipc-rpc:RPC over Node.js process.send" 1. 技术背景 在Node.js开发中,进程间通信(IPC)是一个重要的概念。IPC允许不同的进程之间共享信息和数据。RPC(Remote Procedure Call)是一种计算机通信协议,允许一个计算机程序调用另一个地址空间(通常是共享网络的另一台计算机上)的子程序,而开发者无需为这种分布式交互编写显式的网络代码。在Node.js中,RPC可以通过多种方式实现,而ipc-rpc库提供了一种通过Node.js内置的进程间通信通道实现RPC的方法。 2. ipc-rpc概述 ipc-rpc库基于Node.js的`process.send`方法,它允许父子进程之间的通信。这种通信方式是通过创建一个客户端和服务器端来完成的。在这个场景下,客户端和服务器端都可能是运行着Node.js代码的不同进程。通过`process.send`发送的消息能够在这些进程之间传递。 3. 主要功能 ipc-rpc支持RPC的调用,并且包括对发送句柄(handle)的支持。这意味着除了基本的数据传输,还可以在进程间传递资源句柄,如文件描述符等。这种支持是IPC通信中的一个高级特性,它为进程间的数据交换提供了更多灵活性。 4. 用法示例 在给定的描述中,提到了如何设置一个简单的服务器和客户端。服务器响应状态请求,并且返回一个字符串"OK!"。这里的关键步骤包括定义一个函数对象,这个对象包含了要暴露给RPC的函数。然后创建一个RPC实例,并指定该实例作为服务器还是客户端。服务器端的实例初始化时需要传入进程对象和函数对象,而客户端则需要传入服务器端的进程对象。 下面是对用法示例的详细解析: - 定义函数对象: ```CoffeeScript functions = { status: (msg, handle, cb) -> cb null, "OK!" } ``` 这里定义了一个名为`status`的函数,它接受三个参数:`msg`是消息对象,`handle`是传递的句柄(如果有),`cb`是回调函数,用于返回操作结果。 - 创建RPC实例并设置为服务器: ```CoffeeScript rpc = new RPC(process, functions: functions, timeout: 500) ``` 使用`RPC`构造函数创建一个实例,`process`是当前Node.js进程对象,`functions`是之前定义的函数对象,`timeout`是超时设置,这里设置为500毫秒。 - 客户端实例化和错误处理: ```CoffeeScript rpc = new RPC(server, (err) => if err # probably didn't pass in something with `.send` as server throw err # ready ) ``` 创建RPC客户端实例,传入`server`参数。这里假设`server`是一个已经启动的RPC服务器实例。回调函数用于处理可能出现的错误,确保传入的对象具有`.send`方法。 - 发起请求: ```CoffeeScript rpc.request("st") ``` 客户端使用`rpc.request`方法发起对服务器的RPC调用,请求名为"st"的函数。 5. 标签说明 标签"CoffeeScript"表示示例代码使用了CoffeeScript语言编写。CoffeeScript是一种向后兼容JavaScript的编程语言,它提供了更简洁的语法,有助于提高代码的可读性和可写性。 6. 文件信息 从文件名`ipc-rpc-master`可以推断,该文件可能是ipc-rpc项目的主分支或主版本文件,包含实现RPC功能的代码库。 通过以上知识点的讲解,我们可以看出ipc-rpc库提供了一种在Node.js应用中实现进程间远程过程调用的简便方法,并且展示了如何在客户端和服务器端之间进行基本的通信交互。这种方式适用于需要在父子进程之间高效传递数据和句柄的场景。

org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: localhost, port: 44444 }: RPC connection error at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:181) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:120) at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638) at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:90) at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127) at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:210) at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:270) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:346) ... 3 more Caused by: java.io.IOException: Error connecting to localhost/127.0.0.1:44444 at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:167) ... 10 more Caused by: java.net.ConnectException: 拒绝连接: localhost/127.0.0.1:44444 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

270 浏览量

2023-07-07 20:11:52,076 INFO [upload-pool-40] c.e.d.j.DataUnitService.DataUnitService#uploadFileToHdfs[DataUnitService.java:98] 本次文件上传HDFS用时:18s 2023-07-07 20:11:52,077 INFO [upload-pool-40] c.e.d.j.DataUnitService.DataUnitService#uploadFileToHdfs[DataUnitService.java:98] 本次文件上传HDFS用时:0s 2023-07-07 20:11:52,514 INFO [upload-pool-35] c.e.d.j.DataUnitService.DataUnitService#tohiveWy[DataUnitService.java:172] /u01/tarsftp//2023070719575912003640001.txt.gz解压>>>>>>/u01/untarsftp/ 2023-07-07 20:11:52,520 WARN [Thread-4655232] o.a.h.h.DFSClient.DFSOutputStream$DataStreamer#run[DFSOutputStream.java:558] DataStreamer Exception org.apache.hadoop.ipc.RemoteException: File /dataunit/cu_access_log/10/2023070719575912003640001.txt could only be written to 0 of the 1 minReplication nodes. There are 11 datanode(s) running and no node(s) are excluded in this o peration. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2121) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:286) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2706) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:875) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:561) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822) at java.security.AccessController.doPrivileged(Native Method) at javax.securi

107 浏览量