flink集成drools,哪里错了吗?谢谢。
val kieServices = KieServices.Factory.get(); // 通过这个静态方法去获取一个实例
val kieContainer = kieServices.getKieClasspathContainer();// 默认去读取配置文件
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text1: DataStream[String] = env.socketTextStream("192.168.171.103", 9000, '\n')
val kieSession = kieContainer.newKieSession("all-rules");// 根据这个字符串去获取kieSession
text1.map(f => {
val arr = f.split(" ")
val userId = arr(0)
val ip = arr(1)
val `type` = arr(2)
kieSession.insert(new DLoginEvent1(userId, ip, `type`))
val count = kieSession.fireAllRules()
println("Fire " + count + " rule(s)!")
})
kieSession.dispose()
env.execute(this.getClass.getSimpleName)