Потоковая передача событий изменений (Change Event Streaming, CES) — одна из самых ожидаемых новых функций, которая появится в SQL Server 2025. Она позволяет непрерывно передавать поэтапные изменения из ваших таблиц напрямую в Azure Event Hubs, где несколько приложений-потребителей могут подписываться на данные событий в реальном времени.
Примечание: Эта статья основана на SQL Server 2025 CTP 2.1. Синтаксис и поведение могут претерпеть незначительные изменения к моменту выпуска продукта. Потоковая передача событий изменений (CES) в конечном итоге будет поддерживаться во всех редакциях SQL Server, включая SQL Server 2025 для Windows, SQL Server 2025 для Linux, Azure SQL Database и Managed Instance.
В этой серии из двух частей я покажу, как настроить и сконфигурировать CES (Часть 1), а затем как создать приложение-потребитель для обработки передаваемых изменений (Часть 2).
Давайте начнём!
Шаг 1: Создание концентратора событий (Event Hub)
Прежде чем SQL Server сможет передавать изменения, вам нужна целевая точка назначения. CES предназначена для прямой передачи в Azure Event Hubs.
Создание пространства имён концентратора событий
Концентратор событий существует внутри пространства имён событий. Чтобы создать новое пространство имён:
- На портале Azure создайте новый ресурс.
- В Marketplace создайте новый ресурс Event Hubs.
- Укажите имя для нового пространства имён Event Hubs в новой или существующей группе ресурсов.
- Выберите ценовую категорию Basic с 1 единицей пропускной способности (достаточно для разработки и тестирования).
- Нажмите «Просмотр и создание», а затем «Создать».
Создание концентратора событий
Теперь вы можете создать новый концентратор событий в новом пространстве имён:
- На странице обзора пространства имён нажмите + Event Hub.
- Укажите имя для нового концентратора событий и оставьте все остальные параметры по умолчанию.
- Нажмите «Просмотр и создание», а затем «Создать».
Создание политики концентратора событий
Теперь создайте политику, которая позволяет управлять концентратором событий:
- В разделе «Параметры» слева нажмите «Политики общего доступа».
- Нажмите + Добавить, чтобы создать новую политику.
- Укажите имя для политики.
- Установите флажок «Управление» (что автоматически включает «Отправка» и «Прослушивание»).
- Нажмите «Создать».

Создание токена SAS
Наконец, вам понадобится токен подписанного URL-адреса (SAS) для аутентификации SQL Server и других клиентов в Event Hub. К сожалению, портал Azure не предоставляет графический интерфейс для создания токенов SAS для Event Hub, поэтому вы должны сгенерировать его программно с помощью PowerShell, Azure CLI или пакета SDK для Azure. В этом руководстве мы будем использовать PowerShell.
Установка модулей PowerShell
Запустите PowerShell от имени администратора и установите необходимые модули.
Примечание: Эти модули нужно установить на компьютере только один раз. Если вы уже устанавливали их ранее, можете пропустить этот шаг.
# Install the general Azure cmdlets (this can take up to 20 minutes)
Install-Module -Name Az -Scope CurrentUser -Repository PSGallery -Force
# Install the Event Hub module (runs quickly)
Install-Module -Name Az.EventHub -Scope CurrentUser -Force
Создание скрипта для токена SAS
Скопируйте следующий код в новый файл с именем Generate-SasToken.ps1. Этот скрипт адаптирован из документации Microsoft по ссылке: Настройка потоковой передачи событий изменений (предварительная версия).
function Generate-SasToken {
# Provide values for the following resources:
$resourceGroupName = "ces-demo-rg"
$namespaceName = "ces-namespace"
$eventHubName = "ces-hub"
$policyName = "ces-policy"
# Login to Azure and select the Azure Subscription
Connect-AzAccount -InformationAction SilentlyContinue | Out-Null
# Validate the existence of the specified resource group, event hub namespace, and event hub
Get-AzResourceGroup -Name $resourceGroupName -ErrorAction Stop | Out-Null
Get-AzEventHubNamespace -ResourceGroupName $resourceGroupName -Name $namespaceName -ErrorAction Stop | Out-Null
Get-AzEventHub -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -Name $eventHubName -ErrorAction Stop | Out-Null
# Get the event hub authorization policy (it must have Manage rights)
$policy = Get-AzEventHubAuthorizationRule -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName -ErrorAction SilentlyContinue
if (-not ("Manage" -in $policy.Rights)) {
throw "Authorization rule '$policyName' does not exist, or is missing the required 'Manage' right"
}
# Get the Primary Key of the Shared Access Policy
$keys = Get-AzEventHubKey -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName
if (-not $keys) {
throw "Could not obtain Azure Event Hub Key"
}
if (-not $keys.PrimaryKey) {
throw "Could not obtain Primary Key"
}
$primaryKey = ($keys.PrimaryKey)
# Define a function to create the SAS token
function Create-SasToken {
param ([string]$resourceUri, [string]$keyName, [string]$key)
$sinceEpoch = [datetime]::UtcNow - [datetime]"1970-01-01"
$expiry = [int]$sinceEpoch.TotalSeconds + (60 * 60 * 24 * 31 * 6) # 6 months
$stringToSign = [System.Web.HttpUtility]::UrlEncode($resourceUri) + "`n" + $expiry
$hmac = New-Object System.Security.Cryptography.HMACSHA256
$hmac.Key = [Text.Encoding]::UTF8.GetBytes($key)
$signature = [Convert]::ToBase64String($hmac.ComputeHash([Text.Encoding]::UTF8.GetBytes($stringToSign)))
$sasToken = "SharedAccessSignature sr=$([System.Web.HttpUtility]::UrlEncode($resourceUri))&sig=$([System.Web.HttpUtility]::UrlEncode($signature))&se=$expiry&skn=$keyName"
return $sasToken
}
# Construct the resource URI for the SAS token
$resourceUri = "https://$namespaceName.servicebus.windows.net/$eventHubName"
# Generate the SAS token using the primary key from the new policy
$sasToken = Create-SasToken -resourceUri $resourceUri -keyName $policyName -key $primaryKey
# Output the SAS token
Write-Host "`n-- Generated SAS Token --" -ForegroundColor Gray
Write-Host $sasToken -ForegroundColor White
Write-Host "-- End of generated SAS Token --`n" -ForegroundColor Gray
# Copy the SAS token to the clipboard
$sasToken | Set-Clipboard
Write-Host "The generated SAS token has been copied to the clipboard." -ForegroundColor Green
}
Generate-SasToken
В начале скрипта (строки с 4 по 7) укажите значения для вашей группы ресурсов, пространства имён, концентратора событий и политики. Кроме того, скрипт генерирует токен, срок действия которого истекает через 6 месяцев. Чтобы изменить срок действия, отредактируйте присваивание переменной $expiry в строке 42.
Запуск скрипта
Прежде чем выполнить скрипт, необходимо разрешить PowerShell запускать локальные скрипты:
Set-ExecutionPolicy -Scope Process -ExecutionPolicy Bypass
Когда появится запрос на подтверждение, введите A (для «Да для всех») и нажмите Enter.
Теперь запустите скрипт:
.\Generate-SasToken.ps1
Вам будет предложено войти в свою учётную запись Microsoft и выбрать подписку Azure. Затем скрипт сгенерирует токен SAS и отобразит его между строками:
-- Generated SAS Token --
<ваш токен SAS>
-- End of generated SAS Token --
Скрипт также копирует сгенерированный токен SAS в буфер обмена, чтобы вы могли вставить его в Блокнот. Он понадобится вам позже при настройке SQL Server для передачи событий в Event Hubs, а затем снова (в Части 2) при настройке приложений-потребителей, которые впоследствии будут считывать эти события из Event Hubs.
Шаг 2: Создание демонстрационной базы данных
Мы будем использовать небольшую тестовую базу данных для демонстрации CES. Выполните этот скрипт в SSMS, чтобы создать базу данных:
-- Create the demo database
USE master
GO
CREATE DATABASE CesDemo
GO
USE CesDemo
GO
-- Create some demo tables
CREATE TABLE Customer (
CustomerId int IDENTITY PRIMARY KEY,
CustomerName varchar(50),
City varchar(20)
)
GO
SET IDENTITY_INSERT Customer ON
INSERT INTO Customer
(CustomerId, CustomerName, City) VALUES
(1, 'Shutter Bros Wholesale', 'New York'),
(2, 'Aperture Supply Co.', 'Los Angeles')
SET IDENTITY_INSERT Customer OFF
CREATE TABLE Product (
ProductId int IDENTITY PRIMARY KEY,
Name varchar(80),
Color varchar(15),
Category varchar(20),
UnitPrice decimal(8, 2),
ItemsInStock smallint
)
GO
SET IDENTITY_INSERT Product ON
INSERT INTO Product
(ProductId, Name, Color, Category, UnitPrice, ItemsInStock) VALUES
(1, 'Canon EOS R5 Mirrorless Camera', 'Black', 'Camera', 3899.99, 10),
(2, 'Nikon Z6 II Mirrorless Camera', 'Silver', 'Camera', 1996.95, 8),
(3, 'Sony NP-FZ100 Rechargeable Battery', 'Black', 'Accessory', 78.00, 25)
SET IDENTITY_INSERT Product OFF
CREATE TABLE [Order] (
OrderId int IDENTITY PRIMARY KEY,
CustomerId int REFERENCES Customer(CustomerId),
OrderDate datetime2
)
GO
CREATE TABLE OrderDetail (
OrderDetailId int IDENTITY PRIMARY KEY,
OrderId int REFERENCES [Order](OrderId),
ProductId int REFERENCES Product(ProductId),
Quantity smallint
)
GO
-- This table lacks Primary Key. Combining that with IncludeAllColumns = 0 results in events that
-- have no primary key, which is essentially useless
CREATE TABLE TableWithNoPK (
Id int IDENTITY,
ItemName varchar(50)
)
GO
INSERT INTO TableWithNoPK (ItemName) VALUES
('Camera'),
('Automobile'),
('Oven'),
('Couch')
GO
-- Create a DML trigger on OrderDetail that updates the ItemsInStock column in the Product table
-- based on the Quantity column in the OrderDetail table
CREATE TRIGGER trgUpdateItemsInStock ON OrderDetail AFTER INSERT, UPDATE, DELETE
AS
BEGIN
-- Handle insert
IF EXISTS (SELECT * FROM inserted) AND NOT EXISTS (SELECT * FROM deleted)
UPDATE Product
SET ItemsInStock = p.ItemsInStock - i.Quantity
FROM
Product AS p
INNER JOIN inserted AS i ON p.ProductId = i.ProductId
-- Handle update
ELSE IF EXISTS (SELECT * FROM inserted) AND EXISTS (SELECT * FROM deleted) AND UPDATE(Quantity)
UPDATE Product
SET ItemsInStock = p.ItemsInStock + d.Quantity - i.Quantity
FROM
Product AS p
INNER JOIN inserted AS i ON p.ProductId = i.ProductId
INNER JOIN deleted AS d ON p.ProductId = d.ProductId
-- Handle delete
ELSE IF EXISTS (SELECT * FROM deleted) AND NOT EXISTS (SELECT * FROM inserted)
UPDATE Product
SET ItemsInStock = p.ItemsInStock + d.Quantity
FROM
Product AS p
INNER JOIN deleted AS d ON p.ProductId = d.ProductId
END
GO
-- Add some procs to handle orders
CREATE OR ALTER PROC CreateOrder
@CustomerId int
AS
BEGIN
INSERT INTO [Order](CustomerId, OrderDate)
VALUES (@CustomerId, SYSDATETIME())
SELECT OrderId = SCOPE_IDENTITY()
END
GO
CREATE OR ALTER PROC CreateOrderDetail
@OrderId int,
@ProductId int,
@Quantity smallint
AS
BEGIN
INSERT INTO OrderDetail (OrderId, ProductId, Quantity)
VALUES (@OrderId, @ProductId, @Quantity)
SELECT OrderDetailId = SCOPE_IDENTITY()
END
GO
CREATE OR ALTER PROC DeleteOrder
@OrderId int
AS
BEGIN
BEGIN TRANSACTION
DELETE FROM OrderDetail WHERE OrderId = @OrderId
DELETE FROM [Order] WHERE OrderId = @OrderId
COMMIT TRANSACTION
END
GO
Эта база данных включает:
- Таблицы Customer, Order, OrderDetail и Product.
- Хранимые процедуры для вставки/удаления строк в таблицах Order и OrderDetail.
- Триггер для OrderDetail, который обновляет остатки в Product. Это демонстрирует, что CES также передаёт изменения в таблицах, которые обновляются триггерами, а не только изменения в таблицах, для которых вы выполняете прямые DML-операции.
- Таблицу TableWithNoPK, чтобы проиллюстрировать, почему таблицы без первичных ключей могут быть проблематичными при использовании с CES.
Шаг 3: Настройка CES
После подготовки концентратора событий и базы данных давайте включим CES.
Создание главного ключа базы данных
Вам нужно сохранить токен SAS в SQL Server в качестве учётных данных с областью действия базы данных, а для этого сначала необходимо создать защищённый паролем главный ключ базы данных, чтобы SQL Server мог зашифровать эти учётные данные токена SAS.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'H@rd2Gue$$P@$$w0rd'
Создание учётных данных с областью действия базы данных
Чтобы безопасно сохранить токен SAS в SQL Server, выполните следующую инструкцию (вставьте токен SAS, скопированный в Блокнот, в параметр SECRET — сохраните всю строку целиком).
CREATE DATABASE SCOPED CREDENTIAL SqlCesCredential
WITH
IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = '<ваш токен SAS>'
Включение CES для базы данных
Выполните этот T-SQL, чтобы включить CES для текущей базы данных:
EXEC sys.sp_enable_event_stream
Теперь проверьте, что CES включена:
SELECT * FROM sys.databases WHERE is_event_stream_enabled = 1
Создание группы потоков событий
Группа потоков событий определяет целевой концентратор событий для ваших событий. Обязательно укажите правильные значения для вашего пространства имён и имени концентратора событий в параметре @destination_location:
EXEC sys.sp_create_event_stream_group
@stream_group_name = 'SqlCesGroup',
@destination_location = 'ces-namespace.servicebus.windows.net/ces-hub',
@destination_credential = SqlCesCredential,
@destination_type = 'AzureEventHubsAmqp'
Добавление таблиц в группу потоков событий
Решите, включать ли старые значения и включать ли все столбцы. Каждая таблица в нашем демонстрационном примере использует разные настройки по разным причинам; старые значения и все значения включаются, когда нам нужен этот дополнительный контекст, и исключаются в пользу уменьшения объёма передаваемых данных, когда они не нужны.
-- Customer: полная строка в каждом событии, без старых значений
EXEC sys.sp_add_object_to_event_stream_group
@stream_group_name = 'SqlCesGroup',
@object_name = 'dbo.Customer',
@include_old_values = 0, -- не включать старые значения при обновлениях/удалениях
@include_all_columns = 1 -- включать все столбцы, даже если они не изменились
-- Product: только изменённые столбцы, включать старые значения (важно для разницы в остатках)
EXEC sys.sp_add_object_to_event_stream_group
@stream_group_name = 'SqlCesGroup',
@object_name = 'dbo.Product',
@include_old_values = 1, -- включать старые значения для изменённых столбцов
@include_all_columns = 0 -- включать только изменённые столбцы
-- Order: только изменённые столбцы, включать старые значения (для аудита изменений)
EXEC sys.sp_add_object_to_event_stream_group
@stream_group_name = 'SqlCesGroup',
@object_name = 'dbo.Order',
@include_old_values = 1, -- включать старые значения для изменённых столбцов
@include_all_columns = 0 -- включать только изменённые столбцы
-- OrderDetail: только изменённые столбцы, включать старые значения (обновления количества важны)
EXEC sys.sp_add_object_to_event_stream_group
@stream_group_name = 'SqlCesGroup',
@object_name = 'dbo.OrderDetail',
@include_old_values = 1, -- включать старые значения для изменённых столбцов
@include_all_columns = 0 -- включать только изменённые столбцы
-- TableWithNoPK: демонстрирует ограничения CES без первичного ключа
EXEC sys.sp_add_object_to_event_stream_group
@stream_group_name = 'SqlCesGroup',
@object_name = 'dbo.TableWithNoPK',
@include_old_values = 0, -- без старых значений
@include_all_columns = 0 -- только изменённые столбцы (по сути бесполезно без первичного ключа)
- Customer: Все столбцы включены для сценариев upsert. Старые значения здесь не важны.
- Product: Старые значения необходимы для расчёта разницы в остатках и ценах.
- Order: Старые значения важны для аудита.
- OrderDetail: Старые и новые количества необходимы для последующей корректировки остатков.
- TableWithNoPK: Включена для демонстрации; в реальных сценариях CES требует наличия первичного ключа, чтобы события были полезными.
Проверка CES для каждой таблицы
Выполните следующие инструкции, чтобы подтвердить, что CES включена для всех добавленных в группу таблиц, и просмотреть связанные с каждой таблицей метаданные CES:
EXEC sp_help_change_feed_table @source_schema = 'dbo', @source_name = 'Customer'
EXEC sp_help_change_feed_table @source_schema = 'dbo', @source_name = 'Product'
EXEC sp_help_change_feed_table @source_schema = 'dbo', @source_name = 'Order'
EXEC sp_help_change_feed_table @source_schema = 'dbo', @source_name = 'OrderDetail'
EXEC sp_help_change_feed_table @source_schema = 'dbo', @source_name = 'TableWithNoPK'
Полезная нагрузка CloudEvent
На этом этапе CES полностью настроена! С этого момента все изменения в этих таблицах будут передаваться в ваш концентратор событий, где несколько клиентов смогут потреблять их в реальном времени. Конкретно каждое событие генерируется и передаётся как CloudEvent со следующей структурой JSON:
{
"specversion": "1.0",
"type": "com.microsoft.SQL.CES.DML.V1",
"source": "\/",
"id": "cc3fcdca-09c0-4f46-a8d3-5d0c3c1eb85a",
"logicalid": "8376457a-17af-49f4-b9ea-0d5071f515f4:0000002C000007300011:00000000000000000002",
"time": "2025-06-30T12:29:46.290Z",
"datacontenttype": "application\/avro-json",
"operation": "UPD",
"segmentindex": 1,
"finalsegment": true,
"data": "{\n \"eventsource\": {\n \"db\": \"CesDemo\",\n \"schema\": \"dbo\",\n \"tbl\": \"Product\",\n \"cols\": [\n {\n \"name\": \"ProductId\",\n \"type\": \"int\",\n \"index\": 0\n },\n {\n \"name\": \"ItemsInStock\",\n \"type\": \"smallint\",\n \"index\": 5\n }\n ],\n \"pkkey\": [\n {\n \"columnname\": \"ProductId\",\n \"value\": \"2\"\n }\n ],\n \"transaction\": {\n \"commitlsn\": \"0000002C:00000730:0011\",\n \"beginlsn\": \"0000002C:00000730:000C\",\n \"sequencenumber\": 2,\n \"committime\": \"2025-06-30T12:29:46.290Z\"\n }\n },\n \"eventrow\": {\n \"old\": \"{\\\"ProductId\\\": \\\"2\\\", \\\"ItemsInStock\\\": \\\"8\\\"}\",\n \"current\": \"{\\\"ProductId\\\": \\\"2\\\", \\\"ItemsInStock\\\": \\\"7\\\"}\"\n }\n}"
}
В этом примере свойство operation имеет значение UPD, что указывает на операцию UPDATE в таблице. Также обратите внимание, что свойство data содержит вложенный JSON, который можно распаковать, чтобы получить необходимые детали каждой DML-операции:
{
"eventsource": {
"db": "CesDemo",
"schema": "dbo",
"tbl": "Product",
"cols": [
{
"name": "ProductId",
"type": "int",
"index": 0
},
{
"name": "ItemsInStock",
"type": "smallint",
"index": 5
}
],
"pkkey": [
{
"columnname": "ProductId",
"value": "2"
}
],
"transaction": {
"commitlsn": "0000002C:00000730:0011",
"beginlsn": "0000002C:00000730:000C",
"sequencenumber": 2,
"committime": "2025-06-30T12:29:46.290Z"
}
},
"eventrow": {
"old": "{\"ProductId\": \"2\", \"ItemsInStock\": \"8\"}",
"current": "{\"ProductId\": \"2\", \"ItemsInStock\": \"7\"}"
}
}
Здесь свойство eventsource описывает базу данных, схему, таблицу, столбцы, первичный ключ и транзакцию для события. А свойство eventrow — это ещё один вложенный уровень JSON внутри CloudEvent, который предоставляет фактические значения столбцов, затронутых событием, в виде набора пар ключ/значение.
В Части 2 мы создадим приложение-потребитель, которое будет прослушивать события, распаковывать полезную нагрузку CloudEvent и обрабатывать их в реальном времени.



Комментариев нет:
Отправить комментарий