解释一下下面这段代码的意思和功能@Override public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { GrpcScope<ReqT> grpcScope = new GrpcScope<>(); checkAndGetRequestId(); // 处理返回 ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) { @Override public void sendMessage(RespT message) { grpcScope.setResponse(message); logAndPerf(call, ErrorCode.OK.getCode(), grpcScope); super.sendMessage(message); } }; Listener<ReqT> requestListener = next.startCall(wrappedCall, headers); return new SimpleForwardingServerCallListener<ReqT>(requestListener) { @Override public void onMessage(ReqT message) { requestListener(call, headers, message, grpcScope); super.onMessage(message); } @Override public void onHalfClose() { if (!call.isReady()) { return; } try { super.onHalfClose(); } catch (Exception e) { handleGrpcException(e, call, grpcScope); } } }; } private <ReqT, RespT> void requestListener(ServerCall<ReqT, RespT> call, Metadata headers, ReqT message, GrpcScope<ReqT> grpcScope) { grpcScope.setRequest(message); // 处理请求头部,tapo全部切换到新接口后去除从请求头获取serviceId的逻辑,并且需要优化代码 Metadata.Key<String> secret = Metadata.Key.of(SERVICE_SECRET_HEADER, Metadata.ASCII_STRING_MARSHALLER); Metadata.Key<String> service = Metadata.Key.of(SERVICE_ID_HEADER, Metadata.ASCII_STRING_MARSHALLER); String serviceSecret = headers.get(secret); String headServiceId = headers.get(service); String paramServiceId = null; // 通过serviceId是否在header中,区分新老架构请求,后续老架构完全下线后需要修改 if (isBlank(headServiceId)) { paramServiceId = getStringValue(message, REQ_SERVICE_ID); headServiceId = paramServiceId; // 后续改成注解方式 if (existsField(message, REQ_ACCOUNT_ID)) { String accountId = getStringValue(message, REQ_ACCOUNT_ID); if (isBlank(accountId)) { closeCall(call, grpcScope, -1, Status.DATA_LOSS.withDescription("account invalid!")); return; } } } grpcScope.setServiceId(headServiceId); if (!verifyService(headServiceId, paramServiceId, serviceSecret)) { log.warn("headServiceId: {}, paramServiceId: {} serviceSecret: {} verify fail", headServiceId, paramServiceId, serviceSecret); // closeCall(call, grpcScope, -2, // Status.DATA_LOSS.withDescription("service secret is not correct!")); return; } if (!RateLimitUtil.tryAcquire(call.getMethodDescriptor())) { closeCall(call, grpcScope, -3, Status.RESOURCE_EXHAUSTED.withDescription("rate limit, please try later!")); } }
时间: 2024-04-03 17:33:43 浏览: 22
这段代码是一个 gRPC 拦截器,用于处理 gRPC 调用过程中的请求和响应,并且实现了一些特定的功能。
在这个拦截器中,首先创建了一个 GrpcScope 对象,用于存储请求和响应的上下文信息。然后通过 checkAndGetRequestId() 方法获取请求的 ID。
在创建 ServerCall 对象时,使用了 SimpleForwardingServerCall 类对原始的 ServerCall 进行了包装,实现了对响应消息的拦截和处理。当收到响应消息时,将消息保存到 GrpcScope 对象中,并且调用 logAndPerf() 方法记录日志和性能数据。然后再调用父类的 sendMessage() 方法将消息发送给客户端。
在创建 Listener 对象时,同样使用了 SimpleForwardingServerCallListener 类对原始的 Listener 进行了包装,实现了对请求消息的拦截和处理。当收到请求消息时,将消息保存到 GrpcScope 对象中,并且调用 requestListener() 方法处理请求头部。然后再调用父类的 onMessage() 方法继续处理消息。
在请求处理完成后,如果调用对象还没有关闭,则调用父类的 onHalfClose() 方法关闭调用对象。如果在关闭过程中发生异常,则调用 handleGrpcException() 方法处理异常,并且将异常信息保存到 GrpcScope 对象中。
其中,requestListener() 方法用于处理请求头部。首先从请求头部中获取服务 ID 和服务密钥,如果获取不到,则从请求消息中获取服务 ID。然后通过 verifyService() 方法验证服务 ID 和服务密钥是否正确。最后使用 RateLimitUtil 工具类判断是否需要对当前调用进行限流。
总的来说,这个拦截器实现了对 gRPC 调用过程中请求和响应的拦截和处理,同时实现了一些特定的功能,如记录日志和性能数据、处理请求头部、验证服务 ID 和服务密钥、限流等。