admin管理员组文章数量:1344927
I'm trying to learn Quarkus by creating a simple MQ above Postgres. I'm sticking to JDBC to keep things as performant as possible, and I've set up a trivial messages
table along with triggers that NOTIFY
when a new record is added.
I want to use PgSubscriber to LISTEN
to these notifications, and provide an API that would allow you to react to each such notification - either by passing a handler, or by exposing a Multi
.
Here's the crucial thing: I want the semantics of the API to be that each notification is wrapped in its own transaction, including the client handler/downstream operations. I've tried @Transactional
, QuarkusTransaction.<whatever>
, client.withTransaction
and who knows what else, and I just can't get it to work.
The worst part is, I'm not even sure which parts I might be doing wrong:
- I don't care if the API is exposed via a
Multi
or a lambda, but maybe wanting to wrap the thing in a transaction constrains this choice, and I'm not realizing it? - I'm testing this by executing an
INSERT
in the handler/downstream operations, then throwing an exception, and verifying that the record was not inserted (which, so far, it always has been). Maybe I'm thinking about this wrong? - I've tried various permutations of doing the INSERT synchronously (
datasource.connection.use { ... }
) and asynchronously (e.g. viaclient.preparedStatement
. Can I support both? Do I have to choose? - the contents of the notification is simply the ID of the message, which I first need to fetch from the
message
table before invoking client code. Am I constrained in how I do this?
I don't particularly need an answer to all/any of the above, I think I can figure it out once I see some working code and iterate from there, but I just can't seem to get there.
Would anybody be willing to provide a minimal example of how to get from a val channel: PgChannel = pgSubscriber.channel(topic)
to an API that has the transactional semantics described above, and also provide a simple demonstration of how I would do "Insert a record in the handler/downstream, then throw an exception"?
Thanks!
I'm trying to learn Quarkus by creating a simple MQ above Postgres. I'm sticking to JDBC to keep things as performant as possible, and I've set up a trivial messages
table along with triggers that NOTIFY
when a new record is added.
I want to use PgSubscriber to LISTEN
to these notifications, and provide an API that would allow you to react to each such notification - either by passing a handler, or by exposing a Multi
.
Here's the crucial thing: I want the semantics of the API to be that each notification is wrapped in its own transaction, including the client handler/downstream operations. I've tried @Transactional
, QuarkusTransaction.<whatever>
, client.withTransaction
and who knows what else, and I just can't get it to work.
The worst part is, I'm not even sure which parts I might be doing wrong:
- I don't care if the API is exposed via a
Multi
or a lambda, but maybe wanting to wrap the thing in a transaction constrains this choice, and I'm not realizing it? - I'm testing this by executing an
INSERT
in the handler/downstream operations, then throwing an exception, and verifying that the record was not inserted (which, so far, it always has been). Maybe I'm thinking about this wrong? - I've tried various permutations of doing the INSERT synchronously (
datasource.connection.use { ... }
) and asynchronously (e.g. viaclient.preparedStatement
. Can I support both? Do I have to choose? - the contents of the notification is simply the ID of the message, which I first need to fetch from the
message
table before invoking client code. Am I constrained in how I do this?
I don't particularly need an answer to all/any of the above, I think I can figure it out once I see some working code and iterate from there, but I just can't seem to get there.
Would anybody be willing to provide a minimal example of how to get from a val channel: PgChannel = pgSubscriber.channel(topic)
to an API that has the transactional semantics described above, and also provide a simple demonstration of how I would do "Insert a record in the handler/downstream, then throw an exception"?
Thanks!
Share Improve this question asked yesterday Gabriel ShanahanGabriel Shanahan 314 bronze badges1 Answer
Reset to default 1Looks like the night is darkest before dawn - I finally managed to crack a working version, which then allowed me to finally make sense of all the stuff that was confusing me.
The heart of my confusions was this: transactions in JDBC (and anything that builds on it) and reactive clients are completely incompatible and cannot be interchanged. This is because, fundamentally, they go through entirely different database connections, managed by entirely different pools and clients:
Jdbc goes through the regular blocking client (
io.quarkus:quarkus-jdbc-postgresql
) that is managed by Agroalreactive clients go through Vert.x, which has it's own connections and its own pool
As a consequence:
@Transactional
annotations have no effect on reactive clients, and neither do all other mechanism that do essentially the same thing, e.g.QuarkusTransaction
transactions on reactive clients (
pool.withTransaction
) have no effect on JDBC queries (such as those done viadatasource.connection.use { ... }
)Crucially, nothing can be done about that - fundamentally, a transaction is owned by the connection, and the reactive and jdbc clients both hold their own, which are not compatible -
io.vertx.sqlclient.impl.Connection
vsjava.sql.Connection
(could perhaps be done in theory by somehow hacking out the raw socket information from one and injecting it into the other, but that's definitely not what's done out of the box)
Now, a big reason for this confusion was what is said in the docs on transactions and reactive extensions, as from that, it seemed like these two worlds are interoperable. However, this only applied to reactive pipelines using JDBC connections, and NOT to reactive pipelines using reactive clients. For pipelines using JDBC connections, and only those, the JDBC transaction is propagated via context propagation so its lifecycle matches the lifecycle of the reactive pipeline, not the function from which it is returned.
Another source of confusion: for the reactive client specifically, if you want to perform multiple operations within the reactive transaction, you need to manually pass around the connection - unlike with JDBC (and everything that builds on it, such as JPA, Hibernate, etc.) there's no behind-the-scenes magic that extracts the connection from some place. I think this could be done in theory, but it's not done in practice, and this key difference is not really emphasized in the docs.
Given that, the answers to my questions are:
If I want to use reactive clients, it would be somewhere between cumbersome and impossible to return a
Multi
, since I have to use.withTransaction { }
. I could, theoretically, just useconnection.begin()
, but then the client would need to callmit
manually, which would make the API pretty cumbersome. I haven't tried exposing aMulti
with normal JDBC, but my gut says that should be doable given the builtin context propagation.Testing it via
INSERT
is fine, just as long as thatINSERT
is executed in the same connection as the one that was opened in the previous step, which implies using the same method as the previous point does (reactive or JDBC). For reactive clients, that additionally means passing along theConnection
, for JDBC, this can be taken care of e.g. via@Transactional
annotations.No, I cannot support both, at least not via a single API. I need to either either go full reactive client, or full JDBC. As stated in the previous point, that implies how I have to do the INSERT.
Yes, I am constrained in how I do this - either full reactive, or full JDBC, as explained in the previous points.
A quick, dirty, begging-to-be-cleaned-up version that works with the reactive approach is
// Implementation
override fun subscribe(topic: String, handler: SqlConnection.(Message) -> Uni<Unit>, termination: () -> Unit) {
val channel: PgChannel = pgSubscriber.channel(topic)
channel.handler { id ->
client.connection.map { it.begin() }
client.withTransaction { connection ->
connection.preparedQuery("SELECT id, topic, payload, created_at FROM $TABLE_NAME WHERE id = $1")
.execute(Tuple.of(UUID.fromString(id)))
.map { rowSet ->
val row = rowSet.first()
Message(
id = row.getUUID("id"),
topic = row.getString("topic"),
payload = objectMapper.readTree(row.getString("payload")),
createdAt = row.getLocalDateTime("created_at").atZone(ZoneId.systemDefault())
).also { logger.info("Fetched NOTIFY message: id=$it.id, topic=$it.topic") }
}
.emitOn(Infrastructure.getDefaultExecutor())
.flatMap { connection.handler(it) }
.onTermination().invoke(termination)
}.subscribe().with(
{},
{ e -> logger.error("Failed processing message $id", e) }
)
}
}
override fun publish(topic: String, payload: JsonNode): Uni<Message> {
val jsonPayload = payload.toString()
return client
.preparedQuery("INSERT INTO $TABLE_NAME (topic, payload) VALUES ($1, $2::jsonb) RETURNING id, created_at")
.execute(Tuple.of(topic, jsonPayload))
.onItem().transform { rowSet ->
val row = checkNotNull(rowSet.firstOrNull()) { "Unable to publish message for topic $topic" }
Message(
id = row.getUUID("id"),
topic = topic,
payload = payload,
createdAt = row.getLocalDateTime("created_at").atZone(ZoneId.systemDefault())
).also {
logger.info("Published message: id='${it.id}', topic='${it.topic}'")
}
}
}
// Test
@Test
fun `subscribe should isolate transactions between messages`() {
val latch = CountDownLatch(2)
val failedMessageIndex = AtomicInteger(-1)
val successMessageIndex = AtomicInteger(-1)
val otherTopic = "otherTopic"
val otherPayload = objectMapper.createObjectNode().put("otherIndex", 1)
// Subscribe with a handler that will fail for one message but succeed for another
messageQueue.subscribe(testTopic, { message ->
val jsonPayload = otherPayload.toString()
preparedQuery("INSERT INTO messages (topic, payload) VALUES ($1, $2::jsonb)")
.execute(Tuple.of(otherTopic, jsonPayload))
.map {
val index = message.payload.get("index").asInt()
if (index == 1) {
successMessageIndex.set(index)
} else if (index == 2) {
failedMessageIndex.set(index)
// Throwing an exception to simulate a failure
throw IllegalStateException("Simulated failure for message 2")
}
}
}, { latch.countDown() })
// Publish two messages
val payload1 = objectMapper.createObjectNode().put("index", 1)
messageQueue.publish(testTopic, payload1).await().indefinitely()
val payload2 = objectMapper.createObjectNode().put("index", 2)
messageQueue.publish(testTopic, payload2).await().indefinitely()
// Wait for both messages to be processed
assertTrue(latch.await(1, TimeUnit.SECONDS))
// Verify the first message was processed successfully
assertEquals(1, successMessageIndex.get())
// Verify the second message attempted processing but failed
assertEquals(2, failedMessageIndex.get())
// Verify only one message was published to otherTopic
var otherTopicMessageCount = dataSource.connection.use { connection ->
val statement = connection.prepareStatement(
"SELECT count(*) FROM messages WHERE topic = ?"
)
statement.setString(1, otherTopic)
val resultSet = statement.executeQuery()
resultSet.next()
resultSet.getInt(1)
}
assertEquals(1, otherTopicMessageCount, "Only one message should have been published to otherTopic")
}
Hope this helps any wanderers that stumble upon this.
本文标签: postgresqlPostgres LISTENNOTIFY in Quarkustransaction for each notificationStack Overflow
版权声明:本文标题:postgresql - Postgres LISTENNOTIFY in Quarkus - transaction for each notification - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1743783309a2538229.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论