admin管理员组

文章数量:1387411

I am putting together a drools rule engine that takes an input from a Kafka data stream. The data stream from Kafka into Flink is working well with the code receiving the data, however, the drools engine doesn't process the data due to an ongoing string of issues. This is the error log I am receiving:

2025-03-17 14:51:54 kmodule.xml found and loaded.
2025-03-17 14:51:54 2025-03-17 03:51:54,053 INFO  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Found kmodule: jar:file:/tmp/tm_172.18.0.5:40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4!/META-INF/kmodule.xml
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Unable to find pom.properties in 40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - As folder project tried to fall back to pom.xml, but could not find one
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Unable to load pom.properties from/tmp/tm_172.18.0.5:40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Cannot find maven pom properties for this project. Using the container's default ReleaseId
2025-03-17 14:51:54 2025-03-17 03:51:54,100 INFO  .droolspiler.kie.builder.impl.InternalKieModuleProvider [] - Creating KieModule for artifact .default:artifact:1.0.0
2025-03-17 14:51:54 2025-03-17 03:51:54,101 ERROR .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Unable to build index of kmodule.xml url=jar:file:/tmp/tm_172.18.0.5:40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4!/META-INF/kmodule.xml
2025-03-17 14:51:54 Unable to get all ZipFile entries: 40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4
2025-03-17 14:51:54 kContainer []
2025-03-17 14:51:54 Available KieBases: []

This is the other files that may be necessary:

pom.xml:

<project xmlns=".0.0" xmlns:xsi=";
  xsi:schemaLocation=".0.0 .0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>transactionruleengine</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>transactionruleengine</name>
  <url>;/url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <drools.version>8.44.0.Final</drools.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>


    <!-- Drools -->
    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-core</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-mvel</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-io</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-xml-support</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.kie</groupId>
      <artifactId>kie-api</artifactId>
      <version>${drools.version}</version>
    </dependency>



  </dependencies>

  <build>
    <plugins>
      <!-- Shade plugin to build a fat JAR -->
      <plugin>
        <groupId>.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <phase>package</phase>
            <configuration>
              <archive>
                <addMavenDescriptor>true</addMavenDescriptor>
              </archive>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                    <exclude>module-info.class</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <!-- Merge LICENSE and NOTICE files -->
                <transformer implementation=".apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/LICENSE.md</resource>
                </transformer>
                <transformer implementation=".apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/NOTICE.md</resource>
                </transformer>
                <transformer implementation=".apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.example.TransactionProcessor</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <resources>
      <resource>
        <directory>src/main/resources</directory>
        <filtering>true</filtering>
        <includes>
          <include>**/*</include>
        </includes>
      </resource>
    </resources>
  </build>



</project>

main code body:

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import .kie.api.KieBase;
import .kie.api.KieServices;
import .kie.api.builder.ReleaseId;
import .kie.api.runtime.KieContainer;
import .kie.api.runtime.KieSession;

import .apache.flink.apimon.eventtime.WatermarkStrategy;
import .apache.flink.connector.kafka.source.KafkaSource;
import .apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import .apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import .apache.flink.apimon.serialization.SimpleStringSchema;

import java.io.InputStream;

public class TransactionProcessor {
    public static void main(String[] args) throws Exception {
        // Kafka source configuration
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("host.docker.internal:9092")
                .setTopics("test-topic")
                .setGroupId("flink-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // Flink execution environment setup
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Processing Kafka messages
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .map(json -> {
                    if (json == null) {
                        System.err.println("Received null message from Kafka.");
                        return null;  // skip the null message
                    }

                    ObjectMapper mapper = new ObjectMapper();
                    Transaction transaction;

                    // Deserialize JSON to Transaction object
                    try {
                        transaction = mapper.readValue(json, Transaction.class);
                    } catch (Exception e) {
                        System.err.println("Error deserializing JSON: " + e.getMessage());
                        return null;  // skip invalid JSON
                    }

                    // Debug: Print parsed transaction
                    System.out.println("Parsed transaction: " + transaction);

                    InputStream is = TransactionProcessor.class.getClassLoader().getResourceAsStream("META-INF/kmodule.xml");
                    if (is == null) {
                        throw new RuntimeException("kmodule.xml not found! Ensure it's in src/main/resources/META-INF/");
                    } else {
                        System.out.println("kmodule.xml found and loaded.");
                    }


                    // Drools rule engine setup
                    KieServices ks = KieServices.Factory.get();
                    KieContainer kContainer = ks.getKieClasspathContainer();
                    System.out.println("kContainer " + kContainer.verify().getMessages().toString());


                    // Initialize KieSession
                    System.out.println("Available KieBases: " + kContainer.getKieBaseNames());
                    KieBase kBase = kContainer.getKieBase();
                    KieSession kSession = kBase.newKieSession();
                    if (kSession == null) {
                        throw new RuntimeException("KieSession initialization failed.");
                    }

                    // Insert transaction and fire all rules
                    kSession.insert(transaction);
                    kSession.fireAllRules();
                    kSession.dispose();

                    return transaction;
                })
                .print();  // Output result to console

        // Execute Flink job
        env.execute("Flink Kafka Integration Example");
    }
}

kmodule.xml:

<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns=";>
    <kbase name="rules" packages="com.example.rules">
        <ksession name="ksession-rules"/>
    </kbase>
</kmodule>

pom.properties:

artifactId=transactionruleengine
groupId=com.example
version=1.0-SNAPSHOT

File Tree: enter image description here

I have tried to mode the pom.properties file into the directory which is just maven, as well as a number of other solutions, such as setting a ReleaseId without the use of pom.properties.

Any help would be greatly appreciated

I am putting together a drools rule engine that takes an input from a Kafka data stream. The data stream from Kafka into Flink is working well with the code receiving the data, however, the drools engine doesn't process the data due to an ongoing string of issues. This is the error log I am receiving:

2025-03-17 14:51:54 kmodule.xml found and loaded.
2025-03-17 14:51:54 2025-03-17 03:51:54,053 INFO  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Found kmodule: jar:file:/tmp/tm_172.18.0.5:40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4!/META-INF/kmodule.xml
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Unable to find pom.properties in 40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - As folder project tried to fall back to pom.xml, but could not find one
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Unable to load pom.properties from/tmp/tm_172.18.0.5:40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4
2025-03-17 14:51:54 2025-03-17 03:51:54,099 WARN  .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Cannot find maven pom properties for this project. Using the container's default ReleaseId
2025-03-17 14:51:54 2025-03-17 03:51:54,100 INFO  .droolspiler.kie.builder.impl.InternalKieModuleProvider [] - Creating KieModule for artifact .default:artifact:1.0.0
2025-03-17 14:51:54 2025-03-17 03:51:54,101 ERROR .droolspiler.kie.builder.impl.ClasspathKieProject     [] - Unable to build index of kmodule.xml url=jar:file:/tmp/tm_172.18.0.5:40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4!/META-INF/kmodule.xml
2025-03-17 14:51:54 Unable to get all ZipFile entries: 40077-1a373e/blobStorage/job_79120341f1c31c833ed62c9081009d88/blob_p-50d87c38beef5c86faf3007e80f59dc2b8ac2fab-799229e26bc6c7811d686b3d3a63fdc4
2025-03-17 14:51:54 kContainer []
2025-03-17 14:51:54 Available KieBases: []

This is the other files that may be necessary:

pom.xml:

<project xmlns="http://maven.apache./POM/4.0.0" xmlns:xsi="http://www.w3./2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache./POM/4.0.0 http://maven.apache./xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>transactionruleengine</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>transactionruleengine</name>
  <url>http://maven.apache.</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <drools.version>8.44.0.Final</drools.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>


    <!-- Drools -->
    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-core</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-compiler</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-mvel</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-io</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.drools</groupId>
      <artifactId>drools-xml-support</artifactId>
      <version>${drools.version}</version>
    </dependency>

    <dependency>
      <groupId>.kie</groupId>
      <artifactId>kie-api</artifactId>
      <version>${drools.version}</version>
    </dependency>



  </dependencies>

  <build>
    <plugins>
      <!-- Shade plugin to build a fat JAR -->
      <plugin>
        <groupId>.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <phase>package</phase>
            <configuration>
              <archive>
                <addMavenDescriptor>true</addMavenDescriptor>
              </archive>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                    <exclude>module-info.class</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <!-- Merge LICENSE and NOTICE files -->
                <transformer implementation=".apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/LICENSE.md</resource>
                </transformer>
                <transformer implementation=".apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>META-INF/NOTICE.md</resource>
                </transformer>
                <transformer implementation=".apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.example.TransactionProcessor</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <resources>
      <resource>
        <directory>src/main/resources</directory>
        <filtering>true</filtering>
        <includes>
          <include>**/*</include>
        </includes>
      </resource>
    </resources>
  </build>



</project>

main code body:

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import .kie.api.KieBase;
import .kie.api.KieServices;
import .kie.api.builder.ReleaseId;
import .kie.api.runtime.KieContainer;
import .kie.api.runtime.KieSession;

import .apache.flink.apimon.eventtime.WatermarkStrategy;
import .apache.flink.connector.kafka.source.KafkaSource;
import .apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import .apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import .apache.flink.apimon.serialization.SimpleStringSchema;

import java.io.InputStream;

public class TransactionProcessor {
    public static void main(String[] args) throws Exception {
        // Kafka source configuration
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("host.docker.internal:9092")
                .setTopics("test-topic")
                .setGroupId("flink-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // Flink execution environment setup
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Processing Kafka messages
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .map(json -> {
                    if (json == null) {
                        System.err.println("Received null message from Kafka.");
                        return null;  // skip the null message
                    }

                    ObjectMapper mapper = new ObjectMapper();
                    Transaction transaction;

                    // Deserialize JSON to Transaction object
                    try {
                        transaction = mapper.readValue(json, Transaction.class);
                    } catch (Exception e) {
                        System.err.println("Error deserializing JSON: " + e.getMessage());
                        return null;  // skip invalid JSON
                    }

                    // Debug: Print parsed transaction
                    System.out.println("Parsed transaction: " + transaction);

                    InputStream is = TransactionProcessor.class.getClassLoader().getResourceAsStream("META-INF/kmodule.xml");
                    if (is == null) {
                        throw new RuntimeException("kmodule.xml not found! Ensure it's in src/main/resources/META-INF/");
                    } else {
                        System.out.println("kmodule.xml found and loaded.");
                    }


                    // Drools rule engine setup
                    KieServices ks = KieServices.Factory.get();
                    KieContainer kContainer = ks.getKieClasspathContainer();
                    System.out.println("kContainer " + kContainer.verify().getMessages().toString());


                    // Initialize KieSession
                    System.out.println("Available KieBases: " + kContainer.getKieBaseNames());
                    KieBase kBase = kContainer.getKieBase();
                    KieSession kSession = kBase.newKieSession();
                    if (kSession == null) {
                        throw new RuntimeException("KieSession initialization failed.");
                    }

                    // Insert transaction and fire all rules
                    kSession.insert(transaction);
                    kSession.fireAllRules();
                    kSession.dispose();

                    return transaction;
                })
                .print();  // Output result to console

        // Execute Flink job
        env.execute("Flink Kafka Integration Example");
    }
}

kmodule.xml:

<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://www.drools./xsd/kmodule">
    <kbase name="rules" packages="com.example.rules">
        <ksession name="ksession-rules"/>
    </kbase>
</kmodule>

pom.properties:

artifactId=transactionruleengine
groupId=com.example
version=1.0-SNAPSHOT

File Tree: enter image description here

I have tried to mode the pom.properties file into the directory which is just maven, as well as a number of other solutions, such as setting a ReleaseId without the use of pom.properties.

Any help would be greatly appreciated

Share Improve this question asked Mar 18 at 5:50 Jackson Bennett-HullinJackson Bennett-Hullin 12 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

I actually found a fix to the issue.

I swapped from loading the rules using a kmodule file and swapped to loading the rules into the program directly through the use of the KieFileSystem. Here is the adapted code below:

KieServices kieServices = KieServices.Factory.get();

KieFileSystem kfs = kieServices.newKieFileSystem();
Resource ruleFile = kieServices.getResources()
        .newClassPathResource("rules/transaction-rules.drl", TransactionProcessor.class.getClassLoader());

kfs.write(ruleFile);

KieBuilder kieBuilder = kieServices.newKieBuilder(kfs).buildAll();
Results results = kieBuilder.getResults();

if (results.hasMessages(Message.Level.ERROR)) {
    results.getMessages(Message.Level.ERROR).forEach(System.err::println);
    throw new RuntimeException("Error building Drools KieBase from DRL files.");
}

KieContainer kieContainer = kieServices.newKieContainer(kieServices.getRepository().getDefaultReleaseId());
KieSession kSession = kieContainer.newKieSession();

if (kSession == null) {
    throw new RuntimeException("KieSession initialization failed.");
}

kSession.insert(transaction);
kSession.fireAllRules();
kSession.dispose();

return transaction;

The above code is placed below the comment "Drools Rule Engine setup"

本文标签: