Syntax

ALTER SOURCE current_source_name
    alter_option;

alter_option depends on the operation you want to perform on the source. For all supported clauses, see the sections below.

Clause

ADD COLUMN

ALTER SOURCE source_name
    ADD COLUMN col_name data_type;
Parameter or clauseDescription
ADD COLUMNThis clause adds a column to the specified source.
col_nameThe name of the new column you want to add to the source.
data_typeThe data type of the newly added column. With the struct data type, you can create a nested table. Elements in a nested table need to be enclosed with angle brackets (<>).
-- Add a column named "v3" to a source named "src1"
ALTER SOURCE src1
    ADD COLUMN v3 int;
  • To alter columns in a source created with a schema registry, see REFRESH SCHEMA.
  • You cannot add a primary key column to a source or table in RisingWave. To modify the primary key of a source or table, you need to recreate the table.
  • You cannot remove a column from a source in RisingWave. If you intend to remove a column from a source, you’ll need to drop the source and create the source again.

REFRESH SCHEMA

Fetch the latest schema from the schema registry and update the source schema.

ALTER SOURCE source_name REFRESH SCHEMA;

Currently when refreshing the schema registry of a source, it is not allowed to drop columns or change types.

For example, assume we have a source as follows:

Create a source
CREATE SOURCE src_user WITH (
    connector = 'kafka',
    topic = 'sr_pb_test',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
    schema.registry = 'http://message_queue:8081',
    message = 'test.User'
);

Then we can refresh its schema with the following statement:

Refresh schema
ALTER SOURCE src_user REFRESH SCHEMA;

RENAME TO

ALTER SOURCE source_name
    RENAME TO new_source_name;
Parameter or clauseDescription
RENAME TOThis clause changes the name of the source.
new_source_nameThe new name of the source.
-- Change the name of a source named "src" to "src1"
ALTER SOURCE src
   RENAME TO src1;

OWNER TO

ALTER SOURCE current_source_name
    OWNER TO new_user;
Parameter or clauseDescription
OWNER TOThis clause changes the owner of the source.
new_userThe new owner you want to assign to the source.
-- Change the owner of the source named "src" to user "user1"
ALTER SOURCE src OWNER TO user1;

SET SCHEMA

ALTER SOURCE current_source_name
    SET SCHEMA schema_name;
Parameter or clauseDescription
SET SCHEMAThis clause moves the source to a different schema.
schema_nameThe name of the schema to which the source will be moved.
-- Move the source named "test_source" to the schema named "test_schema"
ALTER SOURCE test_source SET SCHEMA test_schema;

SET SOURCE_RATE_LIMIT

ALTER SOURCE source_name
    SET SOURCE_RATE_LIMIT { TO | = } { default | rate_limit_number };

Use this statement to modify the rate limit of a source. For the specific value of SOURCE_RATE_LIMIT, refer to How to view runtime parameters.

For a newly created materialized view on a source with historical data e.g. Kafka source, it will backfill from the source. The backfilling process will not be affected by the SOURCE_RATE_LIMIT of the source.

To modify the rate limit of the backfilling process, please refer to SET BACKFILL_RATE_LIMIT.

Example
-- Alter the rate limit of a source to default
ALTER SOURCE kafka_source SET source_rate_limit TO default;
Example
-- Alter the rate limit of a source to 1000
ALTER SOURCE kafka_source SET source_rate_limit TO 1000;

SWAP WITH

ALTER SOURCE name
SWAP WITH target_name;
ParameterDescription
nameThe current name of the source to swap.
target_nameThe target name of the source you want to swap with.
-- Swap the names of the api_data source and the file_data source.
ALTER SOURCE api_data
SWAP WITH file_data;