admin管理员组

文章数量:1344927

I'm trying to learn Quarkus by creating a simple MQ above Postgres. I'm sticking to JDBC to keep things as performant as possible, and I've set up a trivial messages table along with triggers that NOTIFY when a new record is added.

I want to use PgSubscriber to LISTEN to these notifications, and provide an API that would allow you to react to each such notification - either by passing a handler, or by exposing a Multi.

Here's the crucial thing: I want the semantics of the API to be that each notification is wrapped in its own transaction, including the client handler/downstream operations. I've tried @Transactional, QuarkusTransaction.<whatever>, client.withTransaction and who knows what else, and I just can't get it to work.

The worst part is, I'm not even sure which parts I might be doing wrong:

  • I don't care if the API is exposed via a Multi or a lambda, but maybe wanting to wrap the thing in a transaction constrains this choice, and I'm not realizing it?
  • I'm testing this by executing an INSERT in the handler/downstream operations, then throwing an exception, and verifying that the record was not inserted (which, so far, it always has been). Maybe I'm thinking about this wrong?
  • I've tried various permutations of doing the INSERT synchronously (datasource.connection.use { ... }) and asynchronously (e.g. via client.preparedStatement. Can I support both? Do I have to choose?
  • the contents of the notification is simply the ID of the message, which I first need to fetch from the message table before invoking client code. Am I constrained in how I do this?

I don't particularly need an answer to all/any of the above, I think I can figure it out once I see some working code and iterate from there, but I just can't seem to get there.

Would anybody be willing to provide a minimal example of how to get from a val channel: PgChannel = pgSubscriber.channel(topic) to an API that has the transactional semantics described above, and also provide a simple demonstration of how I would do "Insert a record in the handler/downstream, then throw an exception"?

Thanks!

I'm trying to learn Quarkus by creating a simple MQ above Postgres. I'm sticking to JDBC to keep things as performant as possible, and I've set up a trivial messages table along with triggers that NOTIFY when a new record is added.

I want to use PgSubscriber to LISTEN to these notifications, and provide an API that would allow you to react to each such notification - either by passing a handler, or by exposing a Multi.

Here's the crucial thing: I want the semantics of the API to be that each notification is wrapped in its own transaction, including the client handler/downstream operations. I've tried @Transactional, QuarkusTransaction.<whatever>, client.withTransaction and who knows what else, and I just can't get it to work.

The worst part is, I'm not even sure which parts I might be doing wrong:

  • I don't care if the API is exposed via a Multi or a lambda, but maybe wanting to wrap the thing in a transaction constrains this choice, and I'm not realizing it?
  • I'm testing this by executing an INSERT in the handler/downstream operations, then throwing an exception, and verifying that the record was not inserted (which, so far, it always has been). Maybe I'm thinking about this wrong?
  • I've tried various permutations of doing the INSERT synchronously (datasource.connection.use { ... }) and asynchronously (e.g. via client.preparedStatement. Can I support both? Do I have to choose?
  • the contents of the notification is simply the ID of the message, which I first need to fetch from the message table before invoking client code. Am I constrained in how I do this?

I don't particularly need an answer to all/any of the above, I think I can figure it out once I see some working code and iterate from there, but I just can't seem to get there.

Would anybody be willing to provide a minimal example of how to get from a val channel: PgChannel = pgSubscriber.channel(topic) to an API that has the transactional semantics described above, and also provide a simple demonstration of how I would do "Insert a record in the handler/downstream, then throw an exception"?

Thanks!

Share Improve this question asked yesterday Gabriel ShanahanGabriel Shanahan 314 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

Looks like the night is darkest before dawn - I finally managed to crack a working version, which then allowed me to finally make sense of all the stuff that was confusing me.

The heart of my confusions was this: transactions in JDBC (and anything that builds on it) and reactive clients are completely incompatible and cannot be interchanged. This is because, fundamentally, they go through entirely different database connections, managed by entirely different pools and clients:

  • Jdbc goes through the regular blocking client (io.quarkus:quarkus-jdbc-postgresql) that is managed by Agroal

  • reactive clients go through Vert.x, which has it's own connections and its own pool

As a consequence:

  • @Transactional annotations have no effect on reactive clients, and neither do all other mechanism that do essentially the same thing, e.g. QuarkusTransaction

  • transactions on reactive clients (pool.withTransaction ) have no effect on JDBC queries (such as those done via datasource.connection.use { ... })

  • Crucially, nothing can be done about that - fundamentally, a transaction is owned by the connection, and the reactive and jdbc clients both hold their own, which are not compatible - io.vertx.sqlclient.impl.Connection vs java.sql.Connection (could perhaps be done in theory by somehow hacking out the raw socket information from one and injecting it into the other, but that's definitely not what's done out of the box)

Now, a big reason for this confusion was what is said in the docs on transactions and reactive extensions, as from that, it seemed like these two worlds are interoperable. However, this only applied to reactive pipelines using JDBC connections, and NOT to reactive pipelines using reactive clients. For pipelines using JDBC connections, and only those, the JDBC transaction is propagated via context propagation so its lifecycle matches the lifecycle of the reactive pipeline, not the function from which it is returned.

Another source of confusion: for the reactive client specifically, if you want to perform multiple operations within the reactive transaction, you need to manually pass around the connection - unlike with JDBC (and everything that builds on it, such as JPA, Hibernate, etc.) there's no behind-the-scenes magic that extracts the connection from some place. I think this could be done in theory, but it's not done in practice, and this key difference is not really emphasized in the docs.

Given that, the answers to my questions are:

  • If I want to use reactive clients, it would be somewhere between cumbersome and impossible to return a Multi, since I have to use .withTransaction { }. I could, theoretically, just use connection.begin(), but then the client would need to call mit manually, which would make the API pretty cumbersome. I haven't tried exposing a Multi with normal JDBC, but my gut says that should be doable given the builtin context propagation.

  • Testing it via INSERT is fine, just as long as that INSERT is executed in the same connection as the one that was opened in the previous step, which implies using the same method as the previous point does (reactive or JDBC). For reactive clients, that additionally means passing along the Connection, for JDBC, this can be taken care of e.g. via @Transactional annotations.

  • No, I cannot support both, at least not via a single API. I need to either either go full reactive client, or full JDBC. As stated in the previous point, that implies how I have to do the INSERT.

  • Yes, I am constrained in how I do this - either full reactive, or full JDBC, as explained in the previous points.

A quick, dirty, begging-to-be-cleaned-up version that works with the reactive approach is

// Implementation

override fun subscribe(topic: String, handler: SqlConnection.(Message) -> Uni<Unit>, termination: () -> Unit) {
        val channel: PgChannel = pgSubscriber.channel(topic)

        channel.handler { id ->
            client.connection.map { it.begin() }
            client.withTransaction { connection ->
                connection.preparedQuery("SELECT id, topic, payload, created_at FROM $TABLE_NAME WHERE id = $1")
                    .execute(Tuple.of(UUID.fromString(id)))
                    .map { rowSet ->
                        val row = rowSet.first()
                        Message(
                            id = row.getUUID("id"),
                            topic = row.getString("topic"),
                            payload = objectMapper.readTree(row.getString("payload")),
                            createdAt = row.getLocalDateTime("created_at").atZone(ZoneId.systemDefault())
                        ).also { logger.info("Fetched NOTIFY message: id=$it.id, topic=$it.topic") }
                    }
                    .emitOn(Infrastructure.getDefaultExecutor())
                    .flatMap { connection.handler(it) }
                    .onTermination().invoke(termination)
            }.subscribe().with(
                {},
                { e -> logger.error("Failed processing message $id", e) }
            )
        }
    }

override fun publish(topic: String, payload: JsonNode): Uni<Message> {
        val jsonPayload = payload.toString()
        return client
            .preparedQuery("INSERT INTO $TABLE_NAME (topic, payload) VALUES ($1, $2::jsonb) RETURNING id, created_at")
            .execute(Tuple.of(topic, jsonPayload))
            .onItem().transform { rowSet ->
                val row = checkNotNull(rowSet.firstOrNull()) { "Unable to publish message for topic $topic" }
                Message(
                    id = row.getUUID("id"),
                    topic = topic,
                    payload = payload,
                    createdAt = row.getLocalDateTime("created_at").atZone(ZoneId.systemDefault())
                ).also {
                    logger.info("Published message: id='${it.id}', topic='${it.topic}'")
                }
            }
    }

// Test
@Test
    fun `subscribe should isolate transactions between messages`() {
        val latch = CountDownLatch(2)
        val failedMessageIndex = AtomicInteger(-1)
        val successMessageIndex = AtomicInteger(-1)

        val otherTopic = "otherTopic"
        val otherPayload = objectMapper.createObjectNode().put("otherIndex", 1)

        // Subscribe with a handler that will fail for one message but succeed for another
        messageQueue.subscribe(testTopic, { message ->
            val jsonPayload = otherPayload.toString()
            preparedQuery("INSERT INTO messages (topic, payload) VALUES ($1, $2::jsonb)")
                .execute(Tuple.of(otherTopic, jsonPayload))
                .map {
                    val index = message.payload.get("index").asInt()

                    if (index == 1) {
                        successMessageIndex.set(index)
                    } else if (index == 2) {
                        failedMessageIndex.set(index)
                        // Throwing an exception to simulate a failure
                        throw IllegalStateException("Simulated failure for message 2")
                    }
                }
        }, { latch.countDown() })

        // Publish two messages
        val payload1 = objectMapper.createObjectNode().put("index", 1)
        messageQueue.publish(testTopic, payload1).await().indefinitely()

        val payload2 = objectMapper.createObjectNode().put("index", 2)
        messageQueue.publish(testTopic, payload2).await().indefinitely()

        // Wait for both messages to be processed
        assertTrue(latch.await(1, TimeUnit.SECONDS))
        
        // Verify the first message was processed successfully
        assertEquals(1, successMessageIndex.get())
        // Verify the second message attempted processing but failed
        assertEquals(2, failedMessageIndex.get())

        // Verify only one message was published to otherTopic
        var otherTopicMessageCount = dataSource.connection.use { connection ->
            val statement = connection.prepareStatement(
                "SELECT count(*) FROM messages WHERE topic = ?"
            )
            statement.setString(1, otherTopic)
            val resultSet = statement.executeQuery()
            resultSet.next()
            resultSet.getInt(1)
        }
        
        assertEquals(1, otherTopicMessageCount, "Only one message should have been published to otherTopic")
    }

Hope this helps any wanderers that stumble upon this.

本文标签: postgresqlPostgres LISTENNOTIFY in Quarkustransaction for each notificationStack Overflow