pgstream v0.4.0: Postgres-to-Postgres replication, snapshots & transformations
Learn how the latest features in pgstream refine Postgres replication with near real-time data capture, consistent snapshots, and column-level transformations.
Author
Elizabet OliveiraDate Published
zzz delete me zzz
Author: Esther Minano Sanz esther@xata.io
Categories: open-source,postgres,schema,pgstream,replication,transformers,snapshots,oss
zzz delete me zzz
We're proud to announce the release of v0.4
of pgstream
, our open-source CDC tool for Postgres! 🚀 In this blog post, we'll dive into some of the key features packed into this latest release, and look at what the future holds!
You can find the complete release notes on the Github v0.4.0 release page.
What is pgstream?
pgstream
is an open source CDC(Change Data Capture) tool and library that offers Postgres replication support with DDL changes. Some of its key features include:
- Replication of DDL changes: schema changes are tracked and seamlessly replicated downstream alongside the data, avoiding manual intervention and data loss.
- Modular deployment configuration: pgstream modular implementation allows it to be configured for simple use cases, removing unnecessary complexity and deployment challenges. However, it can also easily integrate with Kafka for more complex use cases.
- Out of the box replication outputs:
- Elasticsearch/Opensearch: replication to search stores with special handling of field IDs to minimise re-indexing.
- Webhooks: subscribe and receive webhook notifications whenever your source data changes.
For more details on how pgstream works under the hood, check out the full documentation.
What's new?
The new release brings three powerful features to pgstream
: Postgres to Postgres replication, snapshots, and transformers. Let's dive into each of them!
If you want a sneak peek, this demo shows all three in action 💪.
!Demo
Postgres to Postgres replication
This release adds a new pgstream
output to support replication to Postgres databases. The Postgres processor processes the WAL events and translates them into SQL queries, which are then sent to the target Postgres database, ensuring efficient, near real-time replication of both data and schema changes. The implementation includes:
- Batch processing: queries are grouped into batches to optimize performance and reduce I/O overhead.
- Schema change handling: Automatically replicates schema changes (e.g., adding or renaming columns).
- Memory guarding: Prevents replication lag by using configurable memory guards.
In contrast to existing Postgres tools, such as pg_dump
/ pg_restore
and Postgres logical replication, pgstream
allows to keep track of schema changes as well as data, making the replication pipeline more robust.
Example
Imagine you're replicating data from a production Postgres database to a staging environment for testing. By using the following configuration, pgstream
will start listening on the replication slot on the source database and replicate changes to the target staging database in near real time.
```sh
Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://prod_user:password@prod-db:5432/prod"
Processor config
PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://staging_user:password@staging-db:5432/staging"
PGSTREAM_POSTGRES_WRITER_BATCH_SIZE=100
PGSTREAM_POSTGRES_WRITER_BATCH_TIMEOUT=1s
```
Check out the Postgres to Postgres tutorial for more details on how to set up and run replication with pgstream
.
Snapshots
With the addition of snapshots to pgstream
, it's now possible to capture a consistent view of your Postgres database at a specific point in time, be it as an initial snapshot of the database before starting replication, or as a stand alone process when replication is not needed.
The snapshot listener produces the snapshot in different ways for the schema and the data in order to optimise the implementation for the different outputs:
- Schema snapshots: Capture the database schema using the
pgstream.schema_log
table or Postgres utilitiespg_dump
/pg_restore
for Postgres to Postgres replication. - Data snapshots: Use transaction snapshot IDs to obtain a stable view of the data, allowing to process tables and rows in parallel. When combined with replication, the current LSN is stored before starting the snapshot to ensure any WAL events that occur during the snapshot can be replayed.
For more details on how the snapshots are implemented, check out the snapshot section in the pgstream documentation.
Example with initial snapshot and replication
You're setting up a new analytics pipeline and need to initialize the target Postgres database with existing data. Snapshots ensure that all data is captured before replication begins.
```sh
Listener config
URL of the PostgreSQL database we want to snapshot
PGSTREAM_POSTGRES_LISTENER_URL="postgres://prod_user:password@prod-db:5432/prod"
PGSTREAM_POSTGRES_LISTENER_INITIAL_SNAPSHOT_ENABLED=true
List of tables we want to snapshot. If the tables are not schema qualified, the public schema will be assumed.
Wildcards are supported.
PGSTREAM_POSTGRES_INITIAL_SNAPSHOT_TABLES="*"
Processor config
PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://analytics_user:password@analytics-db:5432/analytics"
```
Example with standalone snapshots
You want to have a realistic staging database, without the need for continuous replication, so you perform snapshots weekly.
```bash
Listener config
PGSTREAM_POSTGRES_SNAPSHOT_LISTENER_URL="postgres://prod_user:password@prod-db:5432/prod"
PGSTREAM_POSTGRES_SNAPSHOT_TABLES="*"
Processor config
PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://staging_user:password@staging-db:5432/staging"
```
Check out the snapshot tutorial for more details on how to set up and perform snapshots with pgstream
.
Transformers
Transformers in pgstream
allow you to modify column values during replication or snapshots. This is particularly useful for anonymizing sensitive data, which is critical for:
- Compliance: Meeting legal and regulatory requirements by masking or modifying PII before it reaches unauthorized systems.
- Security: Preventing sensitive data leaks by ensuring that only de-identified information is replicated.
- Testing & development: Providing developers with realistic datasets while safeguarding personal information.
- Analytics & reporting: Enabling insights from anonymized data without exposing private user details.
The pgstream
implementation of transformers acts as a wrapper around the existing processors, which means they can be applied to any available outputs (Elasticsearch/OpenSearch, webhooks, Postgres). It leverages open source libraries, such as Greenmask and Neosync, and it can integrate with custom transformers.
The column transformation rules are defined in a YAML configuration file.
Transformer rules example
```yaml
transformations:
- schema: public
table: users
column_transformers:
email:
name: neosync_email
parameters:
preserve_length: true # Ensures the transformed email has the same length as the original.
preserve_domain: true # Keeps the domain of the original email intact.
email_type: fullname # Specifies the type of email transformation.
```
To see the list of supported transformers check out the transformers section of the pgstream
documentation.
Example
Let's say we need to anonymize email addresses before replicating user data to a staging database. We can configure a pgstream
transformer to modify the email
field while keeping other information intact by pointing to the previous example transformation rules in the pgstream
configuration.
```bash
Listener config
PGSTREAM_POSTGRES_LISTENER_URL="postgres://prod_user:password@prod-db:5432/prod"
Processor config
Path to the transformer rules yaml file
PGSTREAM_TRANSFORMER_RULES_FILE="transformer_rules.yaml"
PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://staging_user:password@staging-db:5432/staging"
```
Check out the transformers tutorial for more details on how to set up and use transformers with pgstream
.
Conclusion
With the latest features discussed in this blogpost, you can build robust, compliant, and efficient data workflows. Whether you're replicating data to downstream systems, anonymizing sensitive information, or creating snapshots, pgstream
has the tools you need.
If you have any suggestions or questions, you can reach out to us on Discord or follow us on X / Twitter. We welcome any feedback in issues, or contributions via pull requests! 💜
Ready to get started? Check out the pgstream documentation for more details.