All versions of SQL Server have supported built-in aggregate functions such as Count(), Sum(), and Max(). SQL Server 2005 introduces user-defined aggregate CLR functions. They are categorized by usage as (scalar) functions. However, their implementation makes them more like UDTs (and just as complex), so I am introducing them here.
Aggregates are defined as structures (structs) or classes that must implement specific interfaces so that the compiler can convert them to aggregate functions in SQL Server 2005. Managed aggregates must contain the SqlUserDefinedAggregate attribute, an interface for aggregating values in the form of four aggregation methods, and the IBinarySerialize interface for storing intermediate states of aggregation.
The interface that CLR will use to aggregate values consists of four methods:
Init()
Accumulate()
Merge()
Terminate()
The Init() method is used by the query processor to initialize computation of the aggregation:
Public Sub Init() arr = New Double(100) {} count = 0 End Sub
It should also perform any cleanup that is needed to start the processing of a set of values.
Accumulate() is called for each value of the set that is being aggregated. It updates the total of the aggregate instance with the value that is used as an argument. The data type of the parameter should be a managed SQL Server data type equivalent to a native SQL Server data type that will be used in the Create Aggregate statement, or it could also be a managed UDT:
Public Sub Accumulate(ByVal value As SqlDouble) arr(count) = value.Value count = count + 1 End Sub
Merge() is used by the query processor to put together another instance of aggregation (processing one subset of values) with the current instance (processing its own subset of values). This is used when the query processor divides the original set into multiple subsets for parallel processing. The data type of its parameter must be the aggregation class itself:
Public Sub Merge(ByVal group As MedianFloat) For Each mem As Double In group.arr arr(count) = mem count = count + 1 Next mem End Sub
The Terminate() function performs the final calculations and returns the result to the caller:
Public Function Terminate() As SqlDouble Array.Resize(arr, count) Array.Sort(arr) Dim n As Integer = count \ 2 If n * 2 = count Then median = (arr(n + 1) + arr(n)) / 2.0 Else median = arr(n + 1) End If Return median End Function
The IBinarySerialize interface, with Read() and Write() methods, is used for storing intermediate states of the aggregate instance. The aggregate class must also be preceded with the Serializable attribute and the class must be marked to implement the IBinarySerialize interface:
<Serializable()> _ ... Public Class MedianFloat Implements IBinarySerialize ... Public Sub Read(ByVal r As BinaryReader) _ Implements IBinarySerialize.Read count = r.ReadInt32(} arr = New Double(count) {} For i As Integer = 1 To count - 1 arr(i) = r.ReadDouble() Next i End Sub Public Sub Write(ByVal w As BinaryWriter) _ Implements IBinarySerialize.Write w.Write(count) For Each m As Double In arr w.Write(m) Next m End Sub
The format of a stream or a file that stores the intermediate state that I have selected is very simple. It consists of an integer that contains a count of elements in the array and then individual elements of the array.
Like all other managed database objects, a class of aggregates must be preceded by an argument that specifies its type—SqlUserDefinedAggregate. It also contains a set of properties of the attribute:
Format | Specifies how the class will be serialized. Possible values are Format.UserDefined and Format.Native. |
IsNulllfEmpty | Forces the query processor to return a Null value if the aggregate is applied to an empty set. |
IslnvariantToNulls | Notifies the query processor that Null values have no effect on the result of the aggregate. |
IslnvariantToDuplicates | Notifies the query optimizer that duplicates should not affect the result of the aggregate. |
IslnvariantToOrder | Has no effect in SQL Server 2005. |
MaxByteSize | Specifies the maximum size of the attribute instance. |
When you put elements from the previous sections together, the aggregate looks like this:
Imports System Imports System.Data Imports System.Data.SqlClient Imports System.Data.SqlTypes Imports Microsoft.SqlServer.Server Imports System.Collections Imports System.IO Imports System.Text <Serializable()> _ <SqlUserDefinedAggregate( _ Format.UserDefined, _ IslnvariantToDuplicates:=True, _ IslnvariantToNulls:=True, _ IslnvariantToOrder:=True, _ IsNullIfEmpty:=True, _ MaxByteSize:=8000) > _ Public Class MedianFloats Implements IBinarySerialize Public Sub Init() arr = New Double(100) {} count = 0 End Sub Public Sub Accumulate(ByVal value As SqlDouble) arr(count) = value.Value count = count + 1 End Sub Public Sub Merge(ByVal group As MedianFloat) For Each mem As Double In group.arr arr(count) = mem count = count + 1 Next mem End Sub Public Function Terminate() As SqlDouble Array.Resize(arr, count) Array.Sort(arr) Dim n As Integer = count \ 2 If n * 2 = count Then median = (arr(n + 1) + arr(n)) / 2.0 Else median = arr(n + 1) End If Return median End Function Public Sub Read(ByVal r As BinaryReader) _ Implements IBinarySerialize.Read count = r.Readlnt32() arr = New Double(count) {} For i As Integer = 1 To count - 1 arr(i) = r.ReadDouble() Next i End Sub Public Sub Write(ByVal w As BinaryWriter) _ Implements IBinarySerialize.Write w.Write(count) For Each m As Double In arr w.Write(m) Next m End Sub ' field members Private median As Double Public arr() As Double Public count As Integer End Class
Or in C#, a similar aggregate function would look like this:
using System; using System.Data.Sql; using System.Data.SqlTypes; using Microsoft.SqlServer.Server; using Systern.Runtime.InteropServices; using System.Collections; using System.IO; using System.Text; namespace MyAggs { [Serializable] [SqlUserDefinedAggregate( Format.UserDefined, IsInvariantToNulls = true, IsInvariantToDuplicates = false, MaxByteSize = 8000} ] public class MedianDouble : IBinarySerialize { private double [] arr; double median; private int count = 0; public void Init() { arr = new double[100]; } public void Accumulate(SqlDouble value) { arr[count] = value.Value; count++; } public void Merge(MedianDouble group) { foreach (double mem in group.arr) { arr[count] = mem; count++; } } public SqlDouble Terminate() { Array.Resize(ref arr, count); Array.Sort(arr); int n = count / 2; if (n * 2 == count) median = (arr[n + 1] + arr[n]) / 2; else median = arr[n + 1]; return median; } public void Read(BinaryReader r) { count = r.ReadInt32(); arr = new double[count]; for (int i = 1; i < count; i++) arr[i] = r.ReadDouble(); } public void Write(BinaryWriter w) { w.Write(count); foreach (double m in arr) w.Write(m); } } }
As with other CLR database objects, you can deploy CLR aggregates using Visual Studio 2005 or using a custom script:
If exists(select * from sys.assembly_modules where assembly_class = 'VbAgg.MedianFloat') drop AGGREGATE MedianFloat GO IF EXISTS (select * from sys.assemblies where name = N'VbAgg') DROP ASSEMBLY VbAgg; GO CREATE ASSEMBLY Aggregates FROM 'C:\Projects\VbAgg\VbAgg\bin\debug\VbAgg.dll' WITH permission_set=Safe; GO CREATE AGGREGATE MedianFloat(@input float) RETURNS float EXTERNAL NAME [VbAgg].[MyAggs.MedianFloat]; GO
You can use managed aggregates on any set of data on which you can use built-in aggregates. The easiest example would be to create a table and the aggregate function on it:
if object_id('Data') is not null drop table dbo.Data create table dbo.Data(id int identity, num real) INSERT INTO dbo.Data (num) VALUES (-911.5); INSERT INTO dbo.Data (num) VALUES (9.6); INSERT INTO dbo.Data (num) VALUES (91.88); INSERT INTO dbo.Data (num) VALUES (509.86); INSERT INTO dbo.Data (num) VALUES (-911.5); INSERT INTO dbo.Data (num) VALUES (90.6); INSERT INTO dbo.Data (num) VALUES (-1.88); INSERT INTO dbo.Data (num) VALUES (19.86); INSERT INTO dbo.Data (num) VALUES (18888.86); select * from Data order by numa select dbo.MedianFloat(num) MedianFloat from Data
The result of last two lines is
id num ------------- ------------- 1 -911.5 5 -911.5 7 -1.88 2 9.6 8 19.86 6 90.6 3 91.88 4 509.8 6 18888.86 (9 row(s) affected) MedianFloat 19.86 (1 row(s) affected) MedianFloat ----------- 19.86 (1 row(s) affected)
The following example is based on cudt_PriceDate, which was created earlier in the chapter. It calculates the moving average price in the window of a predefined set of days.
using System; using System.Data.Sql; using System.Data.SqlTypes; using Microsoft.SqlServer.Server; using System.IO; using System.Text; using MyUDTs; namespace MyAggs { [Serializable] [SqlUserDefinedAggregate( Format.UserDefined, IsInvariantToNulls = true, IsInvariantToDuplicates = false, IsInvariantToOrder = false, MaxByteSize = 8000} ] public class agg MovingAvg : IBinarySerialize { private double sum = 0; private DateTime startDt; private DateTime endDt; private double ma; private Int32 count = 1; public static readonly int daysNum = 50; #region Aggregation Methods /// <summary> /// Initialize the internal data structures /// </summary> public void Init() { startDt = DateTime .Today; endDt = startDt.AddDays(-1 * daysNum); } /// <summary> /// Accumulate the next value, not if the value is null /// </summary> /// <param name="value">Another value to be aggregated</param> public void Accumulate(cudt_PriceDate value) { if (value.IsNull) { return; } if (value.BusinessDay > endDt && value.BusinessDay < this.startDt) { sum += (double)value.StockPrice; count++; } } /// <summary> /// Merge the partially computed aggregate with this aggregate /// </summary> /// <param name="other">Another set of data to be added</param> public void Merge(agg_MovingAvg other) { sum += other.sum; count += other.count; } /// <summary> /// Called at the end of aggregation /// to return the results of the aggregation. /// </summary> /// <returns>the aggregated value</returns> public SqlDouble Terminate() { ma = sum / count; return new SqlDouble(ma); } #endregion #region IBinarySerialize public void Read(BinaryReader r) { sum = r.ReadDouble(}; count = r.ReadInt32(}; } public void Write (Binary-Writer w) { w.Write (sum) ; w.Write(count); } #endregion } }
To test the new aggregation function, I will create a table based on the UDT. I will insert a couple of records in it and then perform the aggregation.
if object_id('dbo.Stock') is not null drop table dbo.Stock create table dbo.Stock( id int identity, stock varchar(100), priceClose cudt_PriceDate) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('3.05,-2-Dec-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('3.08;l-Dec-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('2.90;30-NOV-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('2.84;29-Nov-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('2.92;28-Nov-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('3.01;25-Nov-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('3.06;23-Nov-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('3.04;22-Nov-05' as cudt_PriceDate)) INSERT INTO dbo.Stock(stock, priceClose) values('NT', Cast('5.10;21-Nov-05' as cudt_PriceDate)) select Stock, dbo.agg_MovingAvg(priceClose) MovingAvg from Stock group by stock
The result will be
Stock MovingAvg -------- ---------------- NT 3.22222222222222