给 Databend 添加 Aggregate 函数 | 函数开发系例二
TCeason2月 8, 2023
在介绍 给 Databend 添加 Scalar 函数 | 函数开发系例一 后,我们来看 Aggregate Function。
Aggregate Function 用于对列的值进行操作并返回单个值。常见的 Agg Function 有 sum, count, avg 等。
函数注册
Agg Function 的 register 函数以小写的函数名和 AggregateFunctionDescription 类型为参数,每个被注册的函数都存入 case_insensitive_desc ( HashMap 结构) 中。
而 case_insensitive_combinator_desc 是为了存储一些组合函数,比如与
_if
count_if
sum_if
pub struct AggregateFunctionFactory {
case_insensitive_desc: HashMap<String, AggregateFunctionDescription>,
case_insensitive_combinator_desc: Vec<(String, CombinatorDescription)>,
}
impl AggregateFunctionFactory {
...
pub fn register(&mut self, name: &str, desc: AggregateFunctionDescription) {
let case_insensitive_desc = &mut self.case_insensitive_desc;
case_insensitive_desc.insert(name.to_lowercase(), desc);
}
...
}
每个被注册的函数都要实现 trait AggregateFunction 和 AggregateFunctionFeatures。其中 AggregateFunctionFeatures 和 Scalar 中的 FunctionProperty 比较类似,都是存储函数的一些特质。
pub type AggregateFunctionRef = Arc<dyn AggregateFunction>;
pub type AggregateFunctionCreator =
Box<dyn Fn(&str, Vec<Scalar>, Vec<DataType>) -> Result<AggregateFunctionRef> + Sync + Send>;
pub struct AggregateFunctionDescription {
pub(crate) aggregate_function_creator: AggregateFunctionCreator,
pub(crate) features: AggregateFunctionFeatures,
}
主要来看 trait AggregateFunction,这里面是 Agg Function 的构成。
函数构成
可以看到与 Scalar 直接使用一个 Struct 不同,AggregateFunction 是一个 trait。因为聚合函数是按 block 累加列中的数据,再累加过程中会产生一些中间结果。
因此 Aggregate Function 必须有初始状态,而且聚合过程中生成的结果也要是 mergeable (可合并) 和 serializable (可序列化) 的。
主要函数有:
- name 表示被注册的函数的名字,比如 avg, sum 等等。
- return_type 表示被注册的函数返回值的类型,同样的函数返回值可能会由于参数类型的不同而产生变化。比如 sum(int8) 参数为 i8 类型,但是返回返回值可能是 int64。
- init_state 用来初始化聚合函数状态。
- state_layout 用来表示 state 在内存中的大小和内存块的排列方式。
- accumulate 用于 SingleStateAggregator。也就是着整个块可以在单个状态下聚合,没有任何 keys。比如 select count(*) from t 此时查询中没有任何分组列的聚合,这时会调度 accumulate 函数。
- accumulate_keys 则是用于 PartialAggregator。这里需要考虑 key 和 offset,每个 key 代表一个唯一的内存地址,记为函数参数 place。
- serialize 将聚合过程中的 state 序列化为二进制。
- deserialize 从二进制反序列化为 state。
- merge 用于合并其他 state 到当前 state。
- merge_result 可以将 Aggregate Function state 合并成单个值。
示例
以 avg 为例
具体实现在 aggregate_avg.rs 中。
因为我们需要累加每个值,并除以非 null 总行数。因此 avg function 被定义为 struct AggregateAvgFunction <T, SumT>。其中 T 和 SumT 是实现 Number 的逻辑类型。
在聚合过程中 avg 会产生的中间状态值是 已经累加的值的总和 以及 已经扫描过的非 null 的行。因此 AggregateAvgState 可以被定义为如下结构。
#[derive(Serialize, Deserialize)]
struct AggregateAvgState<T: Number> {
#[serde(bound(deserialize = "T: DeserializeOwned"))]
pub value: T,
pub count: u64,
}
- return_type 设置为 Float64Type。比如 value = 3, count = 2, avg = value/count。
- init_state 初始状态设置 value 为 T 的 default 值,count 为 0。
- accumulate AggregateAvgState 的 count, value 分别对 block 中非 NULL 的行数和值进行累加。
- accumulate_keys 通过 获取对应的状态值,并进行更新。
place.get::<AggregateAvgState<SumT>>()
fn accumulate_keys(
&self,
places: &[StateAddr],
offset: usize,
columns: &[Column],
_input_rows: usize,
) -> Result<()> {
let darray = NumberType::<T>::try_downcast_column(&columns[0]).unwrap();
darray.iter().zip(places.iter()).for_each(|(c, place)| {
let place = place.next(offset);
let state = place.get::<AggregateAvgState<SumT>>();
state.add(c.as_(), 1);
});
Ok(())
}
类似的聚合函数示例也可以参考 sum 和 count 的实现:
函数测试
Unit Test
聚合函数相关单元测试在 agg.rs 中。
Logic Test
Functions 相关的 logic 测试在 tests/logictest/suites/base/02_function/ 中。
关于 Databend
Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
- Databend 文档:https://docs.databend.cn/
- Twitter:https://twitter.com/Datafuse_Labs
- Slack:https://datafusecloud.slack.com/
- Wechat:Databend
- GitHub :https://github.com/datafuselabs/databend
订阅我们的新闻简报
及时了解功能发布、产品规划、支持服务和云服务的最新信息!