Life of a Stream
How VReplication replicates data
When a VReplication workflow runs, data is copied from source to target shards. Each target
PRIMARY tablet runs one
vreplication stream (
vstream) for each source shard that the target's
keyrange overlaps with.
The diagram below outlines how one such stream operates. VReplication can be asked to start from a specific
GTID or from the start. When starting from a
GTID the replication mode is used where it
streams events from the binlog.
Full Table Copy #
If the entire table data is requested then the simple streaming done by the replication mode can create an avalanche of events (think 100s of millions of rows). Moreover, and more importantly, it is highly likely that necesasry older binlogs are no longer available.
So a copy/catchup mode is initiated first: data in the tables are copied over in a consistent manner using bulk inserts. Once we have copied enough data so that we are close enough to the current position (when replication lag is low) it switches over to, and forever stays in, the replication mode. All future replication is done only by streaming binlog events.
While we may have multiple database sources in a workflow each
vstream has just one source and one target. The source is
vttablet (and hence one
mysqld instance). The target could be another
vttablet (when resharding) or a
streaming gRPC response (for
vstream API clients).
Transformation and Filtering #
Note that for all steps the data selected from the source will only be from the tables specified
Match field of the Rule
specification of the VReplication workflow. Furthermore, if a
Filter is specified for a table it will
be applied before being sent to the target. Columns may also be transformed based on the Filter’s
Source and Sink #
Each stream has two actors: the target initiates streaming by making gRPC calls to the source tablet and the source
tablet sources the data by connecting to its underlying
mysqld server as a replica (while replicating) or using SQL
queries (in the copy phase) and streams it to the target. The target takes appropriate action: in case of resharding it
will convert the events into CRUD SQL statements and apply them to the target database. In case of
API clients the events are forwarded by
vtgate to the client.
Note that the target always pulls data. If the source pushes data, there are chances of buffer overruns if the target is
not able to process them in time. For example, in resharding workflows we need to convert the events to SQL
statements and execute them on the target's mysqld instance, which are usually much slower than just selecting data on
Modes, in Detail #
This is the easiest to understand. The source stream just acts like a MySQL replica and processes events as they are
received. Events, after any necessary filtering and transformation, are sent to the target. Replication runs
continuously with short sleeps when there are no more events to source. Periodic heartbeats are sent to the target to
signal liveness. You will see this reflected with the
Running state for the workflow.
Initialize is called at the start of the copy phase. For each table to be copied an entry is created in the internal
_vt.copy_state table with a null primary key (PK). As each table copy is completed the related entries are deleted
and when there are no more entries for this workflow the copy phase is considered complete and the workflow moves into
the replication mode which you will see reflected with the
Running state for the workflow.
Copy works on one table at a time. The source selects a set of rows from the table, for primary keys greater than the
ones copied so far, using a consistent snapshot. This results in a stream of rows to be sent to the target which
generates a bulk
INSERT for these rows. You will see this reflected with the
Copying state for the workflow.
However, there are a couple of factors which complicate our story:
- Each copy selects all rows until the current position of the binlog, but,
- Since transactions continue to be applied (presuming the database is online) the GTID position is continuously moving forward
Consider this example:
We have two tables
Y. Each table has 20 rows and we copy 10 rows at a time. (The queries below simplified
The queries for the copy phase of
X will be:
T1: select * from X where pk > 0 limit 10; GTID: 100, Last PK 10 send rows to target T2: select * from X where pk > 10 limit 10; GTID: 110, Last PK 20 send rows to target
There is a gotcha here: onsider that there are 10 new transactions or GTIDs between times T1 and T2. Some of these can potentially modify the rows returned from the query at T1. Hence if we just return the rows from T2 (which have only rows from PK 11 to 20) then we will have an inconsistent state on the target: the updates to rows with PK between 1 and 10 will not be present.
This means that we need to first stream the events between GTIDs 100 and 110 for primary keys between 1 and 10 first and then do the second select:
T1: select * from X where pk > 0 limit 10; GTID: 100, Last PK 10 send rows to target T2: replicate from 100 to current position (110 from previous example), only pass events for pks 1 to 10 of X T3: select * from X where pk > 10 limit 10; GTID: 112, Last PK 20 send rows to target
Another gotcha: note that at time T3 when we selected the PKs from 11 to 20 the GTID position could have moved further! This could be due to transactions that were applied between T2 and T3. So if we just applied the rows from T3 we would still have an inconsistent state, if transactions 111 and 112 affected the rows from pks 1 to 10.
This leads us to the following flow:
T1: select * from X where pk > 0 limit 10; GTID: 100, Last PK 10 send rows to target T2: replicate from 100 to current position (110 from previous example), only pass events for pks 1 to 10 T3: select * from X where pk > 10 limit 10; GTID: 112, Last PK 20 T4: replicate from 111 to 112 only pass events for pks 1 to 10 T5: Send rows for pks 11 to 20 to target
This flow actually works and is the one used in Vitess VReplication!
The transactions to be applied at T1 can take a long time (due to the bulk inserts). T3 (which is just a snapshot) is quick. So the position can diverge much more at T2 than at T4. Hence, we call step T2 "Catchup" and step T4 "Fast Forward".
As detailed above the catchup phase runs between copy phase cycles (time limited by the
vreplication_copy_phase_max_duration flag). During the copy phase the
GTID position can move significantly ahead. So we run a catchup and fast-forward phase until we come close to the current
position — i.e. the replication lag is small. At that point we execute another Copy cycle.
Fast Forward #
During the copy phase we first take a snapshot. Then we fast-forward: we replicate from the gtid position where we stopped the Catchup to the position of the new snapshot.
Finally once we have finished copying all the tables we proceed to the replicate or
Running phase until our job is done:
for example if we have resharded and switched over the reads and writes to the new shards or when the
vstream API client closes its connection.