To stream data out of RisingWave, you must create a sink. A sink is an external target that you can send data to. Use the CREATE SINK statement to create a sink. You need to specify what data to be exported, the format, and the sink parameters.

Sinks become visible right after you create them, regardless of the backfilling status. Therefore, it’s important to understand that the data in the sinks may not immediately reflect the latest state of their upstream sources due to the latency of the sink, connector, and backfilling process. To determine whether the process is complete and the data in the sink is consistent, refer to Monitor statement progress.

Currently, RisingWave supports the following sink connectors:

  • Apache Doris sink connector (connector = 'doris') With this connector, you can sink data from RisingWave to Apache Doris. For details about the syntax and parameters, see Sink data to Apache Doris.
  • Apache Iceberg sink connector (connector = 'iceberg') With this connector, you can sink data from RisingWave to Apache Iceberg. For details about the syntax and parameters, see Sink data to Apache Iceberg.
  • AWS Kinesis sink connector (connector = 'kinesis') With this connector, you can sink data from RisingWave to AWS Kinesis. For details about the syntax and parameters, see Sink data to AWS Kinesis.
  • Cassandra and ScyllaDB sink connector (connector = 'cassandra') With this connector, you can sink data from RisingWave to Cassandra or ScyllaDB. For details about the syntax and parameters, see Sink data to Cassandra or ScyllaDB.
  • ClickHouse sink connector (connector = 'clickhouse') With this connector, you can sink data from RisingWave to ClickHouse. For details about the syntax and parameters, see Sink data to ClickHouse.
  • CockroachDB sink connector (connector = 'jdbc') With this connector, you can sink data from RisingWave to CockroachDB. For details about the syntax and parameters, see Sink data to CockroachDB.
  • Delta Lake sink connector (connector = 'deltalake') With this connector, you can sink data from RisingWave to Delta Lake. For details about the syntax and parameters, see Sink data to Delta Lake.
  • Elasticsearch sink connector (connector = 'elasticsearch') With this connector, you can sink data from RisingWave to Elasticsearch. For details about the syntax and parameters, see Sink data to Elasticsearch.
  • Google BigQuery sink connector (connector = 'bigquery') With this connector, you can sink data from RisingWave to Google BigQuery. For details about the syntax and parameters, see Sink data to Google BigQuery.
  • Google Pub/Sub sink connector (connector = 'google_pubsub') With this connector, you can sink data from RisingWave to Google Pub/Sub. For details about the syntax and parameters, see Sink data to Google Pub/Sub.
  • JDBC sink connector for MySQL, PostgreSQL, or TiDB (connector = 'jdbc') With this connector, you can sink data from RisingWave to JDBC-available databases, such as MySQL, PostgreSQL, or TiDB. When sinking to a database with a JDBC driver, ensure that the corresponding table created in RisingWave has the same schema as the table in the database you are sinking to. For details about the syntax and parameters, see Sink to MySQL, Sink to PostgreSQL, or Sink to TiDB.
  • Kafka sink connector (connector = 'kafka') With this connector, you can sink data from RisingWave to Kafka topics. For details about the syntax and parameters, see Sink data to Kafka.
  • MQTT sink connector (connector = 'mqtt') With this connector, you can sink data from RisingWave to MQTT topics. For details about the syntax and parameters, see Sink data to MQTT.
  • NATS sink connector (connector = 'nats') With this connector, you can sink data from RisingWave to NATS. For details about the syntax and parameters, see Sink data to NATS.
  • Pulsar sink connector (connector = 'pulsar') With this connector, you can sink data from RisingWave to Pulsar. For details about the syntax and parameters, see Sink data to Pulsar.
  • Redis sink connector (connector = 'redis') With this connector, you can sink data from RisingWave to Redis. For details about the syntax and parameters, see Sink data to Redis.
  • Snowflake sink connector (connector = 'snowflake') With this connector, you can sink data from RisingWave to Snowflake. For details about the syntax and parameters, see Sink data to Snowflake.
  • StarRocks sink connector (connector = 'starrocks') With this connector, you can sink data from RisingWave to StarRocks. For details about the syntax and parameters, see Sink data to StarRocks.
  • Microsoft SQL Server sink connector(connector = 'sqlserver') With this connector, you can sink data from RisingWave to Microsoft SQL Server. For details about the syntax and parameters, see Sink data to SQL Server.

Sink decoupling

Typically, sinks in RisingWave operate in a blocking manner. This means that if the downstream target system experiences performance fluctuations or becomes unavailable, it can potentially impact the stability of the RisingWave instance. However, sink decoupling can be implemented to address this issue.

Sink decoupling introduces a buffering queue between a RisingWave sink and the downstream system. This buffering mechanism helps maintain the stability and performance of the RisingWave instance, even when the downstream system is temporarily slow or unavailable.

PUBLIC PREVIEW

This feature is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our Slack channel. Your input is valuable in helping us improve this feature. For more details, see our Public Preview Feature List.

The sink_decouple session variable can be specified to enable or disable sink decoupling. The default value for the session variable is default.

To enable sink decoupling for all sinks created in the sessions, set sink_decouple as true or enable.

SET sink_decouple = true;

To disable sink decoupling, set sink_decouple as false or disable, regardless of the default setting.

SET sink_decouple = false;

Sink decoupling is enabled by default for all sinks in RisingWave.

An internal system table rw_sink_decouple is provided to query whether a created sink has enabled sink decoupling or not.

dev=> select sink_id, is_decouple from rw_sink_decouple;
 sink_id | is_decouple
---------+-------------
       2 | f
       5 | t
(2 rows)

Upsert sinks and primary keys

For each sink, you can specify the data format. The available data formats are upsert, append-only, and debezium. To determine which data format is supported by each sink connector, please refer to the detailed guide listed above.

In the upsert sink, a non-null value updates the last value for the same key or inserts a new value if the key doesn’t exist. A NULL value indicates the deletion of the corresponding key.

When creating an upsert sink, note whether or not you need to specify the primary key in the following situations.

  • If the downstream system supports primary keys and the table in the downstream system has a primary key, you must specify the primary key with the primary_key field when creating an upsert JDBC sink.
  • If the downstream system supports primary keys but the table in the downstream system has no primary key, then RisingWave does not allow users to create an upsert sink. A primary key must be defined in the table in the downstream system.
  • If the downstream system does not support primary keys, then users must define the primary key when creating an upsert sink.

Sink data in parquet or json format

PUBLIC PREVIEW

Sinking data in Parquet format is currently in public preview, meaning it is nearing the final product but may not yet be fully stable. If you encounter any issues or have feedback, please reach out to us via our Slack channel. Your input is valuable in helping us improve this feature. For more details, see our Public Preview Feature List.

RisingWave supports sinking data in Parquet or JSON formats to cloud storage services, including S3, Google Cloud Storage (GCS), Azure Blob Storage, and WebHDFS. This eliminates the need for complex data lake setups. Once the data is saved, the files can be queried using RisingWave’s batch processing engine through the file_scan API. You can also leverage third-party OLAP query engines to enhance data processing capabilities.

Below is an example to sink data to S3:

CREATE SINK test_file_sink FROM test
WITH (
    connector = 's3',
    s3.region_name = '{config['S3_REGION']}',
    s3.bucket_name = '{config['S3_BUCKET']}',
    s3.credentials.access = '{config['S3_ACCESS_KEY']}',
    s3.credentials.secret = '{config['S3_SECRET_KEY']}',
    s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
    s3.path = '',
    type = 'append-only',
    force_append_only='true'
) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');

File sink currently supports only append-only mode, so please change the query to append-only and specify this explicitly after the FORMAT ... ENCODE ... statement.

Batching strategy for file sink

Batching strategy for file sink is newly introduced in version 2.1 and is not supported in versions prior to 2.1. If you want to use it, please upgrade to version 2.1.

RisingWave implements batching strategies for file sinks to optimize file management by preventing the generation of numerous small files. The batching strategy is available for Parquet, JSON, and CSV encode.

Category

  • Batching based on row numbers: RisingWave monitors the number of rows written and completes the file once the maximum row count threshold is reached.

  • Batching based on rollover interval: RisingWave checks the threshold each time a chunk is about to be written and when a barrier is encountered.

  • If no batching strategy is specified, RisingWave defaults to writing a new file every 10 seconds.

The condition for batching is relatively coarse-grained. The actual number of rows or exact timing of file completion may vary from the specified thresholds, as this function is intentionally flexible to prioritize efficient file management.

File organization

You can use path_partition_prefix to organize files into subdirectories based on their creation time. The available options are month, day, or hour. If not specified, files will be stored directly in the root directory without any time-based subdirectories.

Regarding file naming rules, currently, files follow the naming pattern /Option<path_partition_prefix>/executor_id + timestamp.suffix. Timestamp differentiates files batched by the rollover interval.

The output files look like below:

path/2024-09-20/47244640257_1727072046.parquet
path/2024-09-20/47244640257_1727072055.parquet

Example

CREATE SINK s1 
FROM t
WITH (
    connector = 's3',
    max_row_count = '100',
    rollover_seconds = '10',
    type = 'append-only',
    path_partition_prefix = 'day'
) FORMAT PLAIN ENCODE PARQUET (force_append_only=true);

In this example, if the number of rows in the file exceeds 100, or if writing has continued for more than 10 seconds, the writing of this file will be completed. Once completed, the file will be visible in the downstream sink system.