User-defined Aggregate CLR Functions


All versions of SQL Server have supported built-in aggregate functions such as Count(), Sum(), and Max(). SQL Server 2005 introduces user-defined aggregate CLR functions. They are categorized by usage as (scalar) functions. However, their implementation makes them more like UDTs (and just as complex), so I am introducing them here.

Structure of Managed Aggregates

Aggregates are defined as structures (structs) or classes that must implement specific interfaces so that the compiler can convert them to aggregate functions in SQL Server 2005. Managed aggregates must contain the SqlUserDefinedAggregate attribute, an interface for aggregating values in the form of four aggregation methods, and the IBinarySerialize interface for storing intermediate states of aggregation.

Aggregation Methods

The interface that CLR will use to aggregate values consists of four methods:

  • Init()

  • Accumulate()

  • Merge()

  • Terminate()

The Init() method is used by the query processor to initialize computation of the aggregation:

 Public Sub Init()     arr = New Double(100) {}     count = 0 End Sub 

It should also perform any cleanup that is needed to start the processing of a set of values.

Accumulate() is called for each value of the set that is being aggregated. It updates the total of the aggregate instance with the value that is used as an argument. The data type of the parameter should be a managed SQL Server data type equivalent to a native SQL Server data type that will be used in the Create Aggregate statement, or it could also be a managed UDT:

 Public Sub Accumulate(ByVal value As SqlDouble)     arr(count) = value.Value     count = count + 1 End Sub 

Merge() is used by the query processor to put together another instance of aggregation (processing one subset of values) with the current instance (processing its own subset of values). This is used when the query processor divides the original set into multiple subsets for parallel processing. The data type of its parameter must be the aggregation class itself:

 Public Sub Merge(ByVal group As MedianFloat)     For Each mem As Double In group.arr         arr(count) = mem         count = count + 1     Next mem End Sub 

The Terminate() function performs the final calculations and returns the result to the caller:

 Public Function Terminate() As SqlDouble    Array.Resize(arr, count)    Array.Sort(arr)    Dim n As Integer = count \ 2    If n * 2 = count Then         median = (arr(n + 1) + arr(n)) / 2.0    Else         median = arr(n + 1)    End If    Return median  End Function 

IBinary Serialize Interface

The IBinarySerialize interface, with Read() and Write() methods, is used for storing intermediate states of the aggregate instance. The aggregate class must also be preceded with the Serializable attribute and the class must be marked to implement the IBinarySerialize interface:

   <Serializable()> _ ...     Public Class MedianFloat      Implements IBinarySerialize ...     Public Sub Read(ByVal r As BinaryReader) _               Implements IBinarySerialize.Read       count = r.ReadInt32(}       arr = New Double(count) {}       For i As Integer = 1 To count - 1           arr(i) = r.ReadDouble()       Next i    End Sub Public Sub Write(ByVal w As BinaryWriter) _            Implements IBinarySerialize.Write      w.Write(count)      For Each m As Double In arr          w.Write(m)      Next m  End Sub 

The format of a stream or a file that stores the intermediate state that I have selected is very simple. It consists of an integer that contains a count of elements in the array and then individual elements of the array.

SqlUserDefinedAggregate Attribute and Its Properties

Like all other managed database objects, a class of aggregates must be preceded by an argument that specifies its type—SqlUserDefinedAggregate. It also contains a set of properties of the attribute:

Format

Specifies how the class will be serialized. Possible values are Format.UserDefined and Format.Native.

IsNulllfEmpty

Forces the query processor to return a Null value if the aggregate is applied to an empty set.

IslnvariantToNulls

Notifies the query processor that Null values have no effect on the result of the aggregate.

IslnvariantToDuplicates

Notifies the query optimizer that duplicates should not affect the result of the aggregate.

IslnvariantToOrder

Has no effect in SQL Server 2005.

MaxByteSize

Specifies the maximum size of the attribute instance.

When you put elements from the previous sections together, the aggregate looks like this:

 Imports System Imports System.Data Imports System.Data.SqlClient Imports System.Data.SqlTypes Imports Microsoft.SqlServer.Server Imports System.Collections Imports System.IO Imports System.Text <Serializable()> _ <SqlUserDefinedAggregate( _    Format.UserDefined, _    IslnvariantToDuplicates:=True, _    IslnvariantToNulls:=True, _    IslnvariantToOrder:=True, _    IsNullIfEmpty:=True, _    MaxByteSize:=8000) > _  Public Class MedianFloats        Implements IBinarySerialize        Public Sub Init()            arr = New Double(100) {}            count = 0        End Sub        Public Sub Accumulate(ByVal value As SqlDouble)            arr(count) = value.Value            count = count + 1        End Sub        Public Sub Merge(ByVal group As MedianFloat)            For Each mem As Double In group.arr                arr(count) = mem                count = count + 1            Next mem        End Sub        Public Function Terminate() As SqlDouble            Array.Resize(arr, count)            Array.Sort(arr)            Dim n As Integer = count \ 2            If n * 2 = count Then                median = (arr(n + 1) + arr(n)) / 2.0            Else                median = arr(n + 1)            End If            Return median        End Function        Public Sub Read(ByVal r As BinaryReader) _                   Implements IBinarySerialize.Read            count = r.Readlnt32()            arr = New Double(count) {}        For i As Integer = 1 To count - 1            arr(i) = r.ReadDouble()        Next i    End Sub    Public Sub Write(ByVal w As BinaryWriter) _               Implements IBinarySerialize.Write       w.Write(count)       For Each m As Double In arr           w.Write(m)       Next m End Sub      ' field members      Private median As Double      Public arr() As Double      Public count As Integer End Class 

Or in C#, a similar aggregate function would look like this:

 using System; using System.Data.Sql; using System.Data.SqlTypes; using Microsoft.SqlServer.Server; using Systern.Runtime.InteropServices; using System.Collections; using System.IO; using System.Text; namespace MyAggs {     [Serializable]     [SqlUserDefinedAggregate(         Format.UserDefined,         IsInvariantToNulls = true,         IsInvariantToDuplicates = false,         MaxByteSize = 8000}      ] public class MedianDouble : IBinarySerialize {     private double [] arr;     double median;     private int count = 0;     public void Init()     {        arr = new double[100];     }     public void Accumulate(SqlDouble value)     {           arr[count] = value.Value;           count++;     }     public void Merge(MedianDouble group)     {           foreach (double mem in group.arr)           {               arr[count] = mem;               count++;           }      }      public SqlDouble Terminate()      {          Array.Resize(ref arr, count);          Array.Sort(arr);          int n = count / 2;          if (n * 2 == count)              median = (arr[n + 1] + arr[n]) / 2;          else              median = arr[n + 1];           return median;           }       public void Read(BinaryReader r)       {          count = r.ReadInt32();          arr = new double[count];          for (int i = 1; i < count; i++)               arr[i] = r.ReadDouble();        }        public void Write(BinaryWriter w)        {           w.Write(count);           foreach (double m in arr)               w.Write(m);       }    } } 

Deploying CLR Aggregates

As with other CLR database objects, you can deploy CLR aggregates using Visual Studio 2005 or using a custom script:

 If exists(select * from sys.assembly_modules           where assembly_class = 'VbAgg.MedianFloat')     drop AGGREGATE MedianFloat GO IF EXISTS (select * from sys.assemblies where name = N'VbAgg') DROP ASSEMBLY VbAgg; GO CREATE ASSEMBLY Aggregates FROM 'C:\Projects\VbAgg\VbAgg\bin\debug\VbAgg.dll' WITH permission_set=Safe; GO CREATE AGGREGATE MedianFloat(@input float) RETURNS float EXTERNAL NAME [VbAgg].[MyAggs.MedianFloat]; GO 

Using CLR Aggregates

You can use managed aggregates on any set of data on which you can use built-in aggregates. The easiest example would be to create a table and the aggregate function on it:

 if object_id('Data') is not null      drop table dbo.Data create table dbo.Data(id int identity, num real) INSERT INTO dbo.Data (num) VALUES (-911.5); INSERT INTO dbo.Data (num) VALUES (9.6); INSERT INTO dbo.Data (num) VALUES (91.88); INSERT INTO dbo.Data (num) VALUES (509.86); INSERT INTO dbo.Data (num) VALUES (-911.5); INSERT INTO dbo.Data (num) VALUES (90.6); INSERT INTO dbo.Data (num) VALUES (-1.88); INSERT INTO dbo.Data (num) VALUES (19.86); INSERT INTO dbo.Data (num) VALUES (18888.86); select * from Data order by numa select dbo.MedianFloat(num) MedianFloat from Data 

The result of last two lines is

 id            num ------------- ------------- 1             -911.5 5             -911.5 7             -1.88 2             9.6 8             19.86 6             90.6 3             91.88 4             509.8 6             18888.86 (9 row(s) affected) MedianFloat 19.86 (1 row(s) affected) MedianFloat ----------- 19.86 (1 row(s) affected) 

The following example is based on cudt_PriceDate, which was created earlier in the chapter. It calculates the moving average price in the window of a predefined set of days.

 using System; using System.Data.Sql; using System.Data.SqlTypes; using Microsoft.SqlServer.Server; using System.IO; using System.Text; using MyUDTs; namespace MyAggs {    [Serializable]    [SqlUserDefinedAggregate(        Format.UserDefined,        IsInvariantToNulls = true,        IsInvariantToDuplicates = false,        IsInvariantToOrder = false,        MaxByteSize = 8000}      ]    public class agg MovingAvg : IBinarySerialize      {          private double sum = 0;          private DateTime startDt;          private DateTime endDt;          private double ma;          private Int32 count = 1;          public static readonly int daysNum = 50;          #region Aggregation Methods          /// <summary>          /// Initialize the internal data structures          /// </summary>          public void Init()          {              startDt = DateTime .Today;              endDt = startDt.AddDays(-1 * daysNum);          }          /// <summary>          /// Accumulate the next value, not if the value is null          /// </summary>          /// <param name="value">Another value to be aggregated</param>          public void Accumulate(cudt_PriceDate value)          {             if (value.IsNull)              {                  return;              }              if (value.BusinessDay > endDt && value.BusinessDay < this.startDt)              {                  sum += (double)value.StockPrice;                  count++;              }          }          /// <summary>          /// Merge the partially computed aggregate with this aggregate          /// </summary>          /// <param name="other">Another set of data to be added</param>          public void Merge(agg_MovingAvg other)          {              sum += other.sum;              count += other.count;          }          /// <summary>          /// Called at the end of aggregation          /// to return the results of the aggregation.          /// </summary>          /// <returns>the aggregated value</returns>          public SqlDouble Terminate()          {             ma = sum / count;             return new SqlDouble(ma);      }      #endregion      #region IBinarySerialize      public void Read(BinaryReader r)      {          sum = r.ReadDouble(};          count = r.ReadInt32(};      }      public void Write (Binary-Writer w)      {          w.Write (sum) ;          w.Write(count);      }      #endregion   } } 

To test the new aggregation function, I will create a table based on the UDT. I will insert a couple of records in it and then perform the aggregation.

 if object_id('dbo.Stock') is not null    drop table dbo.Stock create table dbo.Stock(    id int identity,    stock varchar(100),    priceClose cudt_PriceDate) INSERT INTO  dbo.Stock(stock, priceClose) values('NT',  Cast('3.05,-2-Dec-05' as cudt_PriceDate)) INSERT INTO   dbo.Stock(stock, priceClose) values('NT',  Cast('3.08;l-Dec-05' as cudt_PriceDate)) INSERT INTO   dbo.Stock(stock, priceClose) values('NT',  Cast('2.90;30-NOV-05' as cudt_PriceDate)) INSERT INTO   dbo.Stock(stock, priceClose) values('NT',  Cast('2.84;29-Nov-05' as cudt_PriceDate)) INSERT INTO   dbo.Stock(stock, priceClose) values('NT',  Cast('2.92;28-Nov-05' as cudt_PriceDate)) INSERT INTO   dbo.Stock(stock, priceClose) values('NT',  Cast('3.01;25-Nov-05' as cudt_PriceDate)) INSERT INTO   dbo.Stock(stock, priceClose) values('NT',  Cast('3.06;23-Nov-05' as cudt_PriceDate)) INSERT INTO   dbo.Stock(stock, priceClose) values('NT',  Cast('3.04;22-Nov-05' as cudt_PriceDate)) INSERT INTO  dbo.Stock(stock, priceClose) values('NT', Cast('5.10;21-Nov-05' as cudt_PriceDate)) select    Stock,    dbo.agg_MovingAvg(priceClose) MovingAvg from Stock group by stock 

The result will be

 Stock    MovingAvg -------- ---------------- NT       3.22222222222222 




Microsoft SQL Server 2005 Stored Procedure Programming in T-SQL &  .NET
Microsoft SQL Server 2005 Stored Procedure Programming in T-SQL & .NET
ISBN: 0072262281
EAN: 2147483647
Year: 2006
Pages: 165

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net