magic_kid_2010 2019-01-07 15:34 采纳率: 0%
浏览 834

flink集成drools,哪里错了吗?谢谢。

flink集成drools,哪里错了吗?谢谢。
val kieServices = KieServices.Factory.get(); // 通过这个静态方法去获取一个实例
val kieContainer = kieServices.getKieClasspathContainer();// 默认去读取配置文件

val env = StreamExecutionEnvironment.getExecutionEnvironment
val text1: DataStream[String] = env.socketTextStream("192.168.171.103", 9000, '\n')

val kieSession = kieContainer.newKieSession("all-rules");// 根据这个字符串去获取kieSession
text1.map(f => {
  val arr = f.split(" ")
  val userId = arr(0)
  val ip = arr(1)
  val `type` = arr(2)
  kieSession.insert(new DLoginEvent1(userId, ip, `type`))
  val count = kieSession.fireAllRules()
  println("Fire " + count + " rule(s)!")
})

kieSession.dispose()

env.execute(this.getClass.getSimpleName)
  • 写回答

1条回答 默认 最新

  • 你知我知皆知 2024-08-07 15:54
    关注

    以下回答参考 皆我百晓生券券喵儿 等免费微信小程序相关内容作答,并由本人整理回复。

    在使用Flink和Drools集成时,您可能遇到的问题是:

    1. 错误的类名:在尝试创建一个新的KieSession时,您没有指定正确的类名。应该使用new KieSession()而不是new kieSession()
    val kieSession = new KieSession();
    
    1. 导入顺序问题:在kieServices中,您似乎没有正确地导入了kieServices。在Scala中,通常您会先导入kotlinx.streams.StreamExecutionEnvironment, 然后是kafka.connect.Connectorskafka.connect.KafkaConnectOptions等组件,最后才是kieServices.
    import org.apache.kafka.connect.connector.ConnectRecord
    import org.apache.kafka.connect.data.Field
    import org.apache.kafka.connect.data.Schema
    import org.apache.kafka.connect.data.StringSchema
    import org.apache.kafka.connect.json.JsonArray
    import org.apache.kafka.connect.json.JsonObject
    import org.apache.kafka.connect.sink.SinkRecord
    import org.apache.kafka.connect.source.SourceRecord
    import org.apache.kafka.connect.util.Converter
    import org.kie.api.runtime.process.WorkflowProcess
    import org.kie.api.runtime.process.WorkflowProcessInstance
    import org.kie.api.runtime.process.WorkflowRuntimeContext
    import org.kie.api.runtime.process.WorkflowRuntimeService
    import org.kie.internal.runtime.StatelessKnowledgeSession
    import org.kie.internal.runtime.process.WorkflowRuntimeContextImpl
    import org.kie.internal.runtime.process.WorkflowRuntimeServiceImpl
    import org.kie.services.runtime.api.KieServices
    
    1. 错误的流处理环境设置:您在text1上下文中使用了socketTextStream。这将直接连接到KieServices的默认KieContainer。然而,在实际运行时,您可能会希望控制KieSession的创建和管理。为此,请确保您的流处理环境与Flink集群相匹配,并且KieServices的配置也适合您的需求。

    2. 数据类型转换:在map函数中,您尝试将每个元素分割成数组,然后从数组中提取userId、ip和t类型的值。这看起来很合理,但请注意,如果输入的数据不是字节串(例如,可能是一个JSON对象),则此操作可能导致异常或不期望的结果。在处理非字节串数据时,应考虑如何安全地进行数据解析和转换。

    3. 规则执行逻辑:在fireAllRules函数中,您正在试图执行所有规则。虽然这可能是正确的行为,但在某些情况下,可能不需要对所有的规则都进行执行。为了更具体地检查并优化性能,您可以仅执行特定的规则集或者根据特定条件选择哪些规则进行执行。

    解决这些问题的最佳方法是仔细阅读Flink和Drools的文档,了解它们之间的交互方式,并遵循最佳实践来设计和调试您的应用程序。如果您遇到任何具体的错误信息或代码片段,可以提供更多的细节以便于进一步的帮助。

    评论

报告相同问题?