public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); System.setProperty("java.security.krb5.conf", "D:/project/lab-project/src/main/resources/kerberos/dev/krb5.conf"); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); System.setProperty("java.security.auth.login.config", "D:/project/lab-project/src/main/resources/kerberos/dev/jaas.conf"); Configuration conf = new Configuration(); conf.set("hadoop.security.authentication", "kerberos"); try { UserGroupInformation.loginUserFromKeytab("c.dev.hdfs", "D:/project/lab-project/src/main/resources/kerberos/dev/ic.dev.hdfs.keytab"); System.out.println("鉴权ok"); UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<Object>() { @Override public Object run() { FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("connect-configs", new StringSchema(), getKafkaProperties()); consumer.setStartFromEarliest(); env.addSource(consumer).print(); try { env.execute("flink kafka test"); } catch (Exception e) { throw new RuntimeException(e); } return null; } }); } catch (IOException e) { System.out.println("连接异常"); throw new RuntimeException(e); } }
时间: 2024-02-10 20:35:16 浏览: 123
多线程训练营资料1.xls.md
这段代码看起来是一个 Flink 程序,用于从 Kafka 消费数据。代码中涉及到了 Kerberos 认证,包括设置 Kerberos 配置、登录用户,以及设置 Jaas 配置等。在代码中,使用了 UserGroupInformation 类的 loginUserFromKeytab 方法来登录 Kerberos,并使用 doAs 方法来运行 Flink 应用程序。在 Flink 应用程序中,使用 FlinkKafkaConsumer 类来从 Kafka 中消费数据,并打印输出,最后通过 env.execute 方法来执行 Flink 应用程序。
需要注意的是,代码中的一些路径和配置信息需要根据实际情况进行修改,比如 krb5.conf 和 jaas.conf 的路径,以及 Kerberos principal 和 keytab 的路径等。同时,这段代码中的 Kerberos 认证需要与 Kafka 集群中的 Kerberos 认证设置相匹配,否则可能会导致认证失败。
阅读全文