5.2.26

Новое в SQL Server 2025: Change Event Streaming (Часть 2: Собирание событий)

Автор: Leonard Lobel, Getting Started with Change Event Streaming in SQL Server 2025 (Part 2: Consuming Events)

Во второй части статьи о новой функции потоковой передачи событий изменений (Change Event Streaming, CES) в SQL Server 2025 я покажу, как собираются события, генерируемые CES. В Части 1 мы подготовили концентратор событий Azure, сгенерировали токен SAS для доступа к нему, создали демонстрационную базу данных CesDemo и включили CES в базе данных. Затем мы добавили таблицы в группу потоков событий, специально подбирая параметры @include_old_values и @include_all_columns. Теперь CES передаёт DML-изменения (операции вставки, обновления и удаления) из этих таблиц в концентратор событий.

Примечание: Эта статья основана на SQL Server 2025 CTP 2.1. Синтаксис и поведение могут претерпеть незначительные изменения к моменту выпуска продукта. Потоковая передача событий изменений (CES) в конечном итоге будет поддерживаться во всех редакциях SQL Server, включая SQL Server 2025 для Windows, SQL Server 2025 для Linux, Azure SQL Database и Managed Instance.

Теперь мы готовы создать клиентское приложение для потребления сгенерированных событий. Но прежде чем начать писать код, давайте установим контекст, чтобы шаги были понятны.

Во-первых, CES лишь записывает данные в концентраторы событий (Event Hubs). Она не знает (и её это не волнует), кто слушает. Задача вашего клиентского приложения (или приложений) — впоследствии потреблять эти события. Наше демонстрационное приложение на C# будет использовать клиентский SDK концентраторов событий (а именно EventProcessorClient) для прослушивания событий.

Каждому клиенту CES нужно где-то записывать прогресс обработки событий. Это называется контрольной точкой (checkpoint), которая работает как «закладка». Используя контрольные точки, клиентские приложения могут останавливаться, а затем возобновлять работу с того места, где они остановились, не обрабатывая повторно уже обработанные события. SDK использует для этого хранилище BLOB-объектов Azure (Azure Blob Storage).

Вы также встретите термин группа потребителей (consumer group). Представьте группу потребителей как «представление» потока с собственной контрольной точкой. Используя несколько групп потребителей (по одной на клиентское приложение), каждое приложение может поддерживать собственную контрольную точку для отметки своего места в потоке событий. Тарифный план Basic позволяет использовать только одну группу потребителей. Переход на (и оплата) более высокого тарифа, чем Basic, позволит вам управлять несколькими клиентскими приложениями, которые одновременно потребляют события из одного концентратора событий, каждое в своём собственном темпе, не мешая друг другу.

Создание контейнера хранилища BLOB-объектов

Вам понадобится контейнер BLOB-объектов в хранилище Azure, чтобы клиентский SDK концентраторов событий мог управлять контрольными точками для ваших групп потребителей.

Создание учётной записи хранения

Контейнер BLOB-объектов существует внутри учётной записи хранения. Чтобы создать новую учётную запись хранения:

  1. На портале Azure создайте новый ресурс.
  2. В Marketplace создайте новый ресурс «Учётная запись хранения».
  3. Укажите имя для новой учётной записи хранения в новой или существующей группе ресурсов (дефисы не допускаются).
  4. Для основной службы выберите «Хранилище BLOB-объектов Azure» или «Azure Data Lake Storage 2-го поколения».
  5. Для избыточности выберите «Локально избыточное хранилище (LRS)» (достаточно для разработки и тестирования).
  6. Нажмите «Просмотр и создание», а затем «Создать».

Создание контейнера BLOB-объектов

Теперь вы можете создать новый контейнер BLOB-объектов в новой учётной записи хранения:

  1. В разделе «Хранение данных» слева нажмите «Контейнеры».
  2. Нажмите + Добавить контейнер.
  3. Укажите имя для нового контейнера.
  4. Нажмите «Создать».

Теперь получите строку подключения для учётной записи хранения:

  1. В разделе «Безопасность и сеть» слева нажмите «Ключи доступа».
  2. Нажмите «Показать» в строке «Строка подключения» для ключа key1.
  3. Нажмите значок «Копировать», чтобы скопировать строку подключения в буфер обмена.
  4. Вставьте строку подключения в Блокнот; она понадобится для конфигурации клиентского приложения.

Создание проекта Visual Studio

Хорошо, мы готовы к работе. Мы создадим нашего клиента-потребителя в виде простого консольного приложения, сосредоточив внимание на подключении потока, десериализации событий и отображении происходящего.

Примечание: Потребители CES также могут быть созданы с помощью Функций Azure (я расскажу об этом в следующей статье). Функции Azure скрывают большую часть шаблонного кода с помощью триггера концентраторов событий, работают без сервера и автоматически масштабируются. В отличие от этого, создание клиента «вручную», как мы делаем здесь, даёт вам максимальный контроль над поведением подключения, пакетной обработкой, политиками повторных попыток и диагностикой.

Давайте начнём!

Запустите Visual Studio 2022. Затем выберите «Создать новый проект» и выберите «Консольное приложение (C#)». Назовите проект CESClient, нажмите «Далее», а затем «Создать».

Установка пакетов NuGet

Сначала нам понадобятся три пакета NuGet для поддержки нашего приложения. Щёлкните правой кнопкой мыши проект CESClient и выберите «Управление пакетами NuGet». Перейдите на вкладку «Обзор», а затем найдите и установите следующие пакеты:

  • Azure.Messaging.EventHubs.Processor
    • Включает клиент и процессор концентраторов событий, а также хранилище BLOB-объектов Azure для поддержки контрольных точек.
  • Microsoft.Extensions.Configuration.Json
    • Поддерживает внешнюю конфигурацию в appsettings.json вместо использования жёстко заданной конфигурации.
  • Newtonsoft.Json
    • Позволяет десериализовать полезную нагрузку CloudEvent, полученную из концентратора событий, которая предоставляется в формате JSON.

Добавление файла конфигурации

Теперь создайте файл appsettings.json, в котором мы будем хранить нашу конфигурацию. Это включает в себя данные подключения и секреты (токен SAS, строку подключения к BLOB-объекту).

  1. Щёлкните правой кнопкой мыши проект и выберите «Добавить» > «Создать элемент».
  2. Назовите файл appsettings.json.
  3. Замените его содержимое на:
{
  "EventHub": {
    "HostName": "ces-namespace.servicebus.windows.net",
    "Name": "ces-hub",
    "SasToken": "вставьте-ваш-токен-sas-здесь"
  },
  "BlobStorage": {
    "ConnectionString": "вставьте-вашу-строку-подключения-blob-здесь",
    "ContainerName": "ces-checkpoint"
  }
}
  1. Для свойства EventHub обратите внимание, что свойство HostName указывает имя нашего пространства имён концентратора событий ces-namespace в качестве префикса имени хоста, свойство Name указывает имя нашего концентратора событий ces-hub, а свойство SasToken содержит токен SAS, сгенерированный для доступа к концентратору событий. Все три эти значения были установлены во время настройки в Части 1.
  2. Для свойства BlobStorage вставьте значения для ConnectionString и ContainerName для контейнера BLOB-объектов хранилища Azure, который вы только что создали.
  3. Чтобы этот файл копировался в выходной каталог при сборке проекта, щёлкните appsettings.json на панели обозревателя решений. Затем в панели свойств установите для параметра «Копировать в выходной каталог» значение «Копировать, если новее».

Добавление кода

Теперь добавьте следующий код в Program.cs:

using Azure;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Storage.Blobs;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;

namespace CESClient
{
  public class Program
  {
    private static int _eventCount;

    // Добавьте методы здесь
  }
}

Этот код импортирует все пространства имён, на которые мы будем ссылаться, и определяет приватное поле как простой счётчик событий, который мы будем увеличивать с каждым полученным событием.

Теперь подключите метод Main:

public static async Task Main(string[] args)
{
  // Приветствие
  Console.WriteLine("SQL Server 2025 Change Event Streaming Client");
  Console.WriteLine();
  Console.Write("Initializing... ");

  // Загружаем конфигурацию из appsettings.json
  var config = new ConfigurationBuilder()
      .SetBasePath(Directory.GetCurrentDirectory())
      .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
      .Build();

  // Создаём клиента контейнера BLOB-объектов, который процессор событий будет использовать для контрольных точек
  var blobStorageConnectionString = config["BlobStorage:ConnectionString"];
  var blobStorageContainerName = config["BlobStorage:ContainerName"];

  var storageClient = new BlobContainerClient(blobStorageConnectionString, blobStorageContainerName);

  // Создаём клиент-процессор событий для обработки событий в концентраторе событий
  var eventHubHostName = config["EventHub:HostName"];
  var eventHubName = config["EventHub:Name"];
  var sasToken = config["EventHub:SasToken"];

  var processor = new EventProcessorClient(
      storageClient,                                        // хранилище контрольных точек
      EventHubConsumerClient.DefaultConsumerGroupName,      // Basic тариф: одна группа потребителей (например, $Default)
      eventHubHostName,
      eventHubName,
      new AzureSasCredential(sasToken)
  );

  // Регистрируем обработчики для обработки событий и ошибок
  processor.ProcessEventAsync += ProcessEventHandler;
  processor.ProcessErrorAsync += ProcessErrorHandler;

  // Начинаем прослушивать события
  Console.Write("starting... ");
  _eventCount = 0;

  await processor.StartProcessingAsync();

  Console.WriteLine("waiting... press any key to stop.");
  Console.ReadKey(intercept: true);

  // Прекращаем прослушивать события
  await processor.StopProcessingAsync();

  Console.WriteLine("Stopped");
}

Этот код загружает конфигурацию из appsettings.json, создаёт клиента контейнера BLOB-объектов (для сохранения контрольных точек), запускает процессор концентраторов событий с группой потребителей по умолчанию и присоединяет обработчики для обработки событий и ошибок. Наконец, он чисто запускается и останавливается, когда пользователь нажимает любую клавишу.

Обработка событий

Теперь добавьте метод ProcessEventHandler. Этот метод сначала анализирует внешнюю оболочку CloudEvent, затем внутреннюю полезную нагрузку, выводит полезные метаданные и направляет события в обработчики вставки/обновления/удаления. Наконец (и это критически важно), он обновляет контрольную точку, чтобы при перезапуске работа возобновилась со следующего события. (Структуру полезной нагрузки CloudEvent я объяснял в Части 1).

private static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
  try
  {
    // Десериализуем данные события
    using var doc = JsonDocument.Parse(eventArgs.Data.Body.ToArray());
    var root = doc.RootElement;
    var dataJson = root.GetProperty("data");

    using var innerDoc = JsonDocument.Parse(dataJson.GetString());
    var data = innerDoc.RootElement;

    Console.WriteLine($"Processing event... #{++_eventCount}");

    // Десериализуем поля "current" и "old" в свойстве eventrow данных события в словари
    var operation = root.GetProperty("operation").GetString();
    var cols = data.GetProperty("eventsource").GetProperty("cols").EnumerateArray();
    var current = JsonSerializer.Deserialize>(data.GetProperty("eventrow").GetProperty("current").GetString());
    var old = JsonSerializer.Deserialize>(data.GetProperty("eventrow").GetProperty("old").GetString());

    DisplayEventMetadata(eventArgs, root, data);

    switch (operation)
    {
      case "INS":
        ProcessInsert(cols, current);
        break;
      case "UPD":
        ProcessUpdate(cols, current, old);
        break;
      case "DEL":
        ProcessDelete(cols, old);
        break;
    }

    Console.WriteLine();
    Console.WriteLine(new string('-', 80));
    Console.WriteLine();

    // Сохраняем прогресс, чтобы не обрабатывать это событие повторно при перезапуске
    await eventArgs.UpdateCheckpointAsync();
  }
  catch (Exception ex)
  {
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine(ex.Message);
    Console.ResetColor();
  }
}

Отображение метаданных события

Этот метод выводит краткий «дамп контекста» для каждого события: сначала он выводит порядковый номер и смещение из ProcessEventArgs, чтобы вы могли точно определить местоположение события в концентраторе событий (полезно для упорядочивания и повторного воспроизведения). Затем он показывает ключевые поля CloudEvent из внешней оболочки: версию спецификации, тип события, DML-операцию (INS, UPD, DEL), метку времени, уникальный ID, логический ID и тип содержимого данных. Наконец, он углубляется во внутреннюю полезную нагрузку CES, чтобы показать базу данных, схему и таблицу, создавшую событие. Вместе эти детали позволяют легко сопоставить то, что вы видите в консоли, с источником отправки и устранять такие проблемы, как неожиданные операции или несоответствия схем.

private static void DisplayEventMetadata(ProcessEventArgs eventArgs, JsonElement root, JsonElement data)
{
  Console.WriteLine("Event Args");
  Console.WriteLine($"  Sequence:Offset => {eventArgs.Data.SequenceNumber}:{eventArgs.Data.Offset}");
  Console.WriteLine();
  Console.WriteLine("Event Data");
  Console.WriteLine($"  Spec version:       {root.GetProperty("specversion").GetString()}");
  Console.WriteLine($"  Operation:          {root.GetProperty("type").GetString()}");
  Console.WriteLine($"  Time:               {root.GetProperty("time").GetString()}");
  Console.WriteLine($"  Event ID:           {root.GetProperty("id").GetString()}");
  Console.WriteLine($"  Logical ID:         {root.GetProperty("logicalid").GetString()}");
  Console.WriteLine($"  Operation:          {root.GetProperty("operation").GetString()}");
  Console.WriteLine($"  Data content type:  {root.GetProperty("datacontenttype").GetString()}");
  Console.WriteLine();
  Console.WriteLine("Data");
  Console.WriteLine($"  Database:           {data.GetProperty("eventsource").GetProperty("db").GetString()}");
  Console.WriteLine($"  Schema:             {data.GetProperty("eventsource").GetProperty("schema").GetString()}");
  Console.WriteLine($"  Table:              {data.GetProperty("eventsource").GetProperty("tbl").GetString()}");
  Console.WriteLine();
}

Обработка вставок

Для операций вставки полное «последующее» изображение проще всего для чтения и является быстрым способом проверки настройки @include_all_columns, которую мы установили в Части 1.

private static void ProcessInsert(JsonElement.ArrayEnumerator cols, Dictionary current)
{
  Console.WriteLine("Operation: Insert");
  Console.ForegroundColor = ConsoleColor.Green;

  foreach (var col in cols)
  {
    var name = col.GetProperty("name").GetString();
    Console.WriteLine($"\t{name}: {current[name]}");
  }

  Console.ResetColor();
}

Обработка обновлений

Для таблиц, где мы включили @include_old_values в Части 1, вы получите отличное бок о бок представление; в противном случае вы увидите только «последующее» состояние.

private static void ProcessUpdate(JsonElement.ArrayEnumerator cols, Dictionary current, Dictionary old)
{
  Console.WriteLine("Operation: Update");

  foreach (var col in cols)
  {
    var name = col.GetProperty("name").GetString();

    if (old.Count > 0 && current[name] != old[name])
    {
      Console.ForegroundColor = ConsoleColor.Yellow;
      Console.WriteLine($"\t{name}: {current[name]} (old: {old[name]})");
      Console.ResetColor();
    }
    else
    {
      Console.WriteLine($"\t{name}: {current[name]}");
    }
  }
}

Обработка удалений

Удаления имеют только «предыдущее» состояние, которое полезно для аудита и сверки.

private static void ProcessDelete(JsonElement.ArrayEnumerator cols, Dictionary old)
{
  Console.WriteLine("Operation: Delete");
  Console.ForegroundColor = ConsoleColor.Red;

  foreach (var col in cols)
  {
    var name = col.GetProperty("name").GetString();
    Console.WriteLine($"\t{name}: {old[name]}");
  }

  Console.ResetColor();
}

Обработка ошибок

Наконец, нам нужно обрабатывать ошибки, не завершая работу приложения. Если произойдёт ошибка, этот метод отображает детали исключения. Конечно, в реальном сценарии потребуется правильная обработка ошибок; например, сохранение деталей события в очередь для автоматической повторной попытки или ручного вмешательства.

private static Task ProcessErrorHandler(ProcessErrorEventArgs e)
{
  Console.ForegroundColor = ConsoleColor.Red;
  Console.WriteLine(e.Exception.Message);
  Console.ResetColor();
  return Task.CompletedTask;
}

Запуск приложения

Вот и момент истины! Запустите приложение. Окно консоли клиента должно открыться, и вы должны увидеть:

SQL Server 2025 Change Event Streaming Client

Initializing... starting... waiting... press any key to stop.

Если появляются ошибки, проверьте значения конфигурации и установку пакетов NuGet.

Генерация и мониторинг событий изменений

Давайте выполним операции вставки, обновления, удаления, а также изменения, вызванные триггерами, на основе схемы базы данных, которую мы настроили в Части 1. Это позволит нам наблюдать за захватом событий в реальном времени и изучать полезные нагрузки CloudEvent по мере их получения.

Запустите SSMS и откройте окно запроса к базе данных CesDemo. Затем расположите окна SSMS и клиентской консоли рядом. Таким образом, вы сможете изучать события в окне клиентской консоли по мере их генерации из окна SSMS.

Создание заказа

В SSMS запустите хранимую процедуру для создания нового заказа:

EXEC CreateOrder @CustomerId = 1

В окне клиентской консоли вы должны увидеть событие INS для таблицы Order.

Создание деталей заказа

Теперь добавьте две детали к заказу:

EXEC CreateOrderDetail @OrderId = 1, @ProductId = 1, @Quantity = 2
EXEC CreateOrderDetail @OrderId = 1, @ProductId = 2, @Quantity = 1

Ожидайте два соответствующих события INS для OrderDetail. И поскольку триггер OrderDetail корректирует Product.ItemsInStock, также ожидайте события UPD для Product, отражающие уменьшение запасов (2 для продукта 1; 1 для продукта 2).

Удаление заказа

Теперь вызовите хранимую процедуру, которая удаляет весь заказ вместе с деталями заказа.

EXEC DeleteOrder @OrderId = 1

Ожидайте события DEL для заказа и его деталей, а также события UPD для Product по мере восстановления запасов.

Обновление клиента

Теперь изменим город клиента на Чикаго:

UPDATE Customer SET City = 'Chicago' WHERE CustomerId = 1

Ожидайте событие UPD для Customer. Напомним, что в Части 1 мы решили не включать старые значения для этой таблицы, поэтому вы увидите только «последующие» значения.

Массовое обновление продуктов

Давайте применим скидку 20% на цену всех камер:

UPDATE Product SET UnitPrice = UnitPrice * 0.8 WHERE Category = 'Camera'

Ожидайте события UPD для каждой подходящей строки. В Части 1 мы включили старые значения и только изменённые столбцы для Product, поэтому вы увидите старую и новую UnitPrice (и первичный ключ, который всегда включается).

Обновление таблицы без первичного ключа

Этот последний пример демонстрирует необходимость всегда определять первичный ключ в таблице:

UPDATE TableWithNoPK SET ItemName = 'Stove' WHERE Id = 3

Ожидайте событие UPD без ключевых столбцов. Таким образом, вы увидите, что название элемента было изменено на Stove в какой-то строке, но вы не будете знать, в какой именно строке, что делает эти данные события бесполезной информацией.

Заключение

На этом всё! В этой серии из двух частей вы успешно создали сквозной конвейер CES. Сначала вы настроили SQL Server 2025 для потоковой передачи изменений в концентраторы событий Azure с соответствующими учётными данными SAS, группой потоков событий и настройками таблиц. Затем вы создали потребителя на C#, который считывает полезные нагрузки, обёрнутые в CloudEvent, и использует хранилище BLOB-объектов Azure для контрольных точек. Наша демонстрация работала с группой потребителей по умолчанию (тариф Basic) для одного приложения, но более высокие тарифы поддерживают несколько групп для нескольких независимых клиентов. Теперь у вас есть чистый, работающий в реальном времени путь от изменений в базе данных к полезным событиям!



Комментариев нет:

Отправить комментарий