19.2.24

Агрегат потока (Stream Aggregate)

Автор оригинала: Craig Freedman


По материалам статьи Craig Freedman: Stream Aggregate

Когда мы имеем дело с предложением GROUP BY, SQL Server для вычисления агрегатов использует два оператора. Один из этих операторов - агрегат потока, который, как Вы помните, был рассмотрен в предыдущей статье, и который используется для скалярных агрегатов. Другой оператор, это агрегат хэша (Hash Aggregate). В этой статье, я более подробно рассмотрю то, как работает агрегат потока.

Алгоритм

Агрегат потока использует данные, поступающие ему отсортированными по столбцу (столбцам) группировки. Если группировка выполняется более чем по одному столбцу, мы можем выбрать произвольный порядок сортировки, который использует все эти столбцы. Например, если мы группируем по столбцу "b", мы можем сортировать по "(a, b)" или по "(b, a)". Как и в случае с соединением слиянием, порядок сортировки может быть задан индексом или явным оператором сортировки. Порядок сортировки гарантирует, что наборы строк с одинаковыми значениями для этих столбцов будут смежными друг с другом.

Ниже представлен псевдокод алгоритма агрегата потока:

clear the current   aggregate results
clear the current   group by columns

for each input row
  begin
    if   the input row does not match the current group by columns
        begin
          output the aggregate results
          clear the current aggregate results
          set the current group by columns to the input row
        end
      update the aggregate results with the input row
  end

Например, если мы вычисляем SUM, агрегат потока суммирует каждую строку на входе. Если строка на входе принадлежит текущей группе (то есть, группа столбцов строки на входе соответствует группе столбцов предыдущей строки), мы обновляем поток SUM, прибавляя соответствующее значение из строки на входе пока не будет подсчитана вся сумма. Если строка на входе принадлежит уже другой группе (то есть, группа столбцов строки на входе не соответствует группе столбцов предыдущей строки), мы отправляем поток SUM на выход, сбрасываем SUM (обнуляем), и запускаем в работу новую группу.

Простые примеры

create table   t (a int, b int,   c int)
select sum(c) from t group by a, b

|--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0)

       |                                    THEN NULL ELSE [Expr1011] END))

       |--Stream Aggregate(GROUP BY:([t].[b], [t].[a])

            |              DEFINE:([Expr1010]=COUNT_BIG([t].[c]),

            |              [Expr1011]=SUM([t].[c])))

            |--Sort(ORDER BY:([t].[b] ASC, [t].[a] ASC))

                 |--Table Scan(OBJECT:([t]))

Вы видите практически такой же план, как и в случае со скалярным агрегатом для запроса с SUM. Отличие только в том, что перед соединением данные должны быть отсортированы (как Вы могли догадаться, скалярный агрегат выглядит как одна большая группа, содержащая все строки; таким образом, для скалярного агрегата отпадает необходимость в сортировке строк для разных групп).

Агрегат потока сохраняет порядок сортировки на входе, т.е. если мы запрашиваем упорядочивание для группы столбца или для группы нескольких столбцов, повторная сортировка окажется ненужной:

select sum(c)   from t group by a, b order by a

|--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0)

       |                                   THEN NULL ELSE [Expr1011] END))

       |--Stream Aggregate(GROUP BY:([t].[a], [t].[b])

            |              DEFINE:([Expr1010]=COUNT_BIG([t].[c]),

            |              [Expr1011]=SUM([t].[c])))

            |--Sort(ORDER BY:([t].[a] ASC, [t].[b] ASC))

                 |--Table Scan(OBJECT:([t]))

Обратите внимание, что столбцы сортировки совсем другие, чем в предыдущем примере. До этого мы не заботились о том, как сортировать, по "(a, b)" или по "(b, a)". Теперь, когда запрос содержит предложение ORDER BY, мы должны выбрать для сортировки первый столбец.

Если имеется подходящий индекс, сортировка оказывается вообще ненужной:

create clustered   index tab on   t(a,b)
select sum(c) from t group by a, b

|--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0)

       |                                    THEN NULL ELSE [Expr1011] END))

       |--Stream Aggregate(GROUP BY:([t].[a], [t].[b])

            |              DEFINE:([Expr1010]=COUNT_BIG([t].[c]),

            |              [Expr1011]=SUM([t].[c])))

            |--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

SELECT и DISTINCT

Когда имеется упорядочивающий индекс, агрегат потока может использоваться также и для выборки с DISTINCT (если для упорядочивания используется сортировка, отсортированы будут и сами дубликаты; т.ч. в этом случае отпадает необходимость использования агрегата потока). Выборка без дубликатов, по существу, то же самое, что и группировка без агрегатных функций по всем столбцам выборки. Например:

select distinct   a, b from t

Можно переписать и так:

select a, b from   t group by a,   b

Оба запроса используют один и тот же план исполнения:

|--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))

       |--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

Обратите внимание, что агрегат потока содержит предложение GROUP BY, а не определения столбцов.

Агрегаты с DISTINCT

Рассмотрим такой запрос:

select sum(distinct b) from t group by a

Необходимо устранить дубликаты значений столбца "b" для каждой группы. В моей предыдущей статье был показан способ реализации этого с использованием сортировки без дубликатов (Sort(DISTINCT …). Однако, если имеется подходящий индекс, для устранения дубликатов также может использоваться и агрегат потока:

|--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1010]=(0)

       |                                    THEN NULL ELSE [Expr1011] END))

       |--Stream Aggregate(GROUP BY:([t].[a])

            |              DEFINE:([Expr1010]=COUNT_BIG([t].[b]),

            |              [Expr1011]=SUM([t].[b])))

            |--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))

                 |--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

Самый нижний в дереве плана агрегат потока устраняет дубликаты, в то время как самый верхний исполняет саму агрегацию.

Несколько DISTINCT

В заключение рассмотрим такой запрос:

select sum(distinct b), sum(distinct c) from t group by a

|--Merge Join(Inner Join, MANY-TO-MANY MERGE:([t].[a])=([t].[a]),

       |                    RESIDUAL:([t].[a] = [t].[a]))

       |--Compute Scalar(DEFINE:([t].[a]=[t].[a]))

       |    |--Compute Scalar(DEFINE:([Expr1005]=CASE WHEN [Expr1018]=(0)

       |         |                                    THEN NULL ELSE [Expr1019] END))

       |         |--Stream Aggregate(GROUP BY:([t].[a])

       |              |              DEFINE:([Expr1018]=COUNT_BIG([t].[c]),

       |              |              [Expr1019]=SUM([t].[c])))

       |              |--Sort(DISTINCT ORDER BY:([t].[a] ASC, [t].[c] ASC))

       |                   |--Clustered Index Scan(OBJECT:([t].[tab]))

       |--Compute Scalar(DEFINE:([t].[a]=[t].[a]))

            |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1020]=(0)

                 |                                    THEN NULL ELSE [Expr1021] END))

                 |--Stream Aggregate(GROUP BY:([t].[a])

                      |              DEFINE:([Expr1020]=COUNT_BIG([t].[b]),

                      |              [Expr1021]=SUM([t].[b])))

                      |--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))

                           |--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

Как и в случае с описанным в моей предыдущей статье примером нескольких скалярных агрегатов с DISTINCT, этот запрос будет разбит в две части - по одной для каждого набора неповторяющихся значений. Обратите внимание, что вычисление "sum(distinct c)" требует сортировки с DISTINCT по столбцу "c", в то время как вычисление "sum(distinct b)" использует упорядоченное сканирование кластеризованного индекса и агрегат потока, как это видно в представленном выше плане исполнения запроса. Эти два набора сумм соединяются при группировке по столбцу "a", что и даёт желаемый результат. Так как два входных набора уже отсортированы по столбцу группировки, может использоваться соединение слиянием (оператор скалярного вычисления "[t].[a] = [t].[a]" необходим для внутренних целей и его можно игнорировать).

Соединение слиянием должно выполняться по схеме "один ко многим" а не "многие ко многим" (MANY-TO-MANY), так как агрегаты гарантируют уникальность по столбцу группировки (и столбцу предиката соединения). Это скорее проблема производительности, а не проблема правильности результата. Если переписать изначальный запрос в виде явного соединения, мы действительно получим соединение "один ко многим":

select sum_b, sum_c
from
  (select a, sum(distinct b) as sum_b from t group by a) r
  join
  (select a, sum(distinct c) as sum_c from t group by a) s
  on r.a = s.a

|--Merge Join(Inner Join, MERGE:([t].[a])=([t].[a]), RESIDUAL:([t].[a]=[t].[a]))

       |--Compute Scalar(DEFINE:([Expr1009]=CASE WHEN [Expr1020]=(0)

       |    |                                    THEN NULL ELSE [Expr1021] END))

       |    |--Stream Aggregate(GROUP BY:([t].[a])

       |         |              DEFINE:([Expr1020]=COUNT_BIG([t].[c]),

       |         |              [Expr1021]=SUM([t].[c])))

       |         |--Sort(DISTINCT ORDER BY:([t].[a] ASC, [t].[c] ASC))

       |              |--Clustered Index Scan(OBJECT:([t].[tab]))

       |--Compute Scalar(DEFINE:([Expr1004]=CASE WHEN [Expr1022]=(0)

            |                                    THEN NULL ELSE [Expr1023] END))

            |--Stream Aggregate(GROUP BY:([t].[a])

                 |              DEFINE:([Expr1022]=COUNT_BIG([t].[b]),

                 |              [Expr1023]=SUM([t].[b])))

                 |--Stream Aggregate(GROUP BY:([t].[a], [t].[b]))

                      |--Clustered Index Scan(OBJECT:([t].[tab]), ORDERED FORWARD)

Далее…

В следующей статье я напишу о другом операторе агрегации, об агрегате хэша.

Комментариев нет:

Отправить комментарий