MongoDB的聚合管道(Aggregation Pipeline)是一种强大的数据处理工具,允许通过多个阶段对文档进行逐步处理、转换和分析。其核心思想是将数据视为流经一系列操作的“管道”,每个阶段处理后的结果传递给下一阶段,最终输出目标数据。

数据按照管道中的每个阶段依次流动;
每个阶段对数据进行特定的转换或筛选;
前一阶段的输出作为下一阶段的输入;
阶段操作符
每个阶段代表一个数据处理操作(如过滤、分组、排序等),按照顺序执行。
前一阶段的输出作为下一阶段的输入,形成处理链。
$match
功能:过滤文档,类似SQL的WHERE。
1 2 3 4 5 6 7 8
| { "$match": { "date": { "$gte": beginday, "$lte": endday } } },
|
$unwind
功能:展开数组字段,将数组元素拆分为独立文档。
1 2 3
| { "$unwind": "$operations" },
|
如果一条原数据中有一个长度为5的operations数组,使用$unwind可以得到5条数据。
$group
功能:按字段分组并计算聚合值(总和、平均等),类似SQL的GROUP BY。
1 2 3 4 5 6 7
| { "$group": { "_id": "$userid", "email": {"$last": "$operations.email"}, "name": {"$last": "$operations.name"}, } },
|
第一个键值对是分组依据,根据此字段划分出多个逻辑组。
后面几个键值对是新文档的结果,使用操作符对文档进行计算,后面的阶段只能使用此阶段声明的属性。
分组依据可以是字段、表达式,或者嵌套对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| { "$group": { "_id": "$userid", "email": {"$last": "$operations.email"}, "name": {"$last": "$operations.name"}, } },
{ $group: { _id: { // 提取 "@" 后的域名部分 domain: { $substr: ["$email", { $indexOfBytes: ["$email", "@"] }, -1] } }, userCount: { $sum: 1 } } }
{ "$group": { "_id": { year: { $year: "$date" }, month: { $month: "$date" } }, "email": {"$last": "$operations.email"}, "name": {"$last": "$operations.name"}, } },
|
在 $group 前使用 $match 或 $project 减少处理的数据量。
$sort
功能:排序结果,类似SQL的ORDER BY。
1 2 3 4 5
| { "$sort": { "date": -1 } },
|
$project
功能:选择或重命名字段,类似SQL的SELECT。
1 2 3 4 5 6 7
| { "$project": { "_id": 0, "email": 1, "name": 1, } }
|
$limit/$skip
功能:分页查询,类似SQL的LIMIT和OFFSET。
1 2 3
| { $sort: { amount: -1 } }, // 1. 按金额降序排序 { $skip: 3 }, // 2. 跳过前3条(即跳过第1页) { $limit: 3 } // 3. 取接下来的3条(第2页)
|
$addFields / $set
功能:新增字段或修改现有字段(不删除其他字段)。
1 2 3 4 5 6 7
| { $addFields: { tax: { $multiply: ["$amount", 0.05] } } }
|
$replaceRoot
功能:将嵌套文档提升为根文档。
1 2 3 4 5 6
| { $replaceRoot: { newRoot: "$details" } }
|
$lookup
功能:关联其他集合,类似SQL的JOIN。
1 2 3 4 5 6 7 8
| { $lookup: { from: "products", localField: "productId", foreignField: "_id", as: "productDetails" } }
|
$natural
按文档在磁盘上的物理存储顺序(自然顺序)排序。
性能优化建议
- 尽早过滤:在管道前段使用$match、$project减少后续处理的数据量。
- 合理使用索引:为$match或$sort涉及的字段创建索引。
- 避免过度拆分阶段:合并可简化的操作,减少管道阶段数。
表达式操作符
每个阶段使用特定的操作符(如 $match、$group)定义数据处理逻辑。
操作符可操作字段、计算聚合值或转换数据结构。
数学操作符
常用操作符:$add, $subtract, $multiply, $divide, $mod, $floor, $ceil
$add 加法
$substract 减法
$multiply 乘法
$divide 除法
$mod 取余
$floor 向下取整
$ceil 向上取整
$toDouble 转换为浮点数
1 2 3 4 5 6 7 8
| { $project: { total: { $multiply: ["$price", "$quantity"] } } }
|
日期操作符
常用操作符:$year, $month, $dayOfMonth, $hour, $dateToString
$year 获取年份
$month 获取月份
$dayOfMonth 获取日
$hour 获取当前小时
$dateToString 转换成字符串
1 2 3 4 5 6 7 8
| { $dateToString: { format: "<格式字符串>", // 必填,定义输出格式 date: "<日期字段或表达式>", // 必填,日期字段或生成日期的表达式 timezone: "<时区>", // 可选,指定时区(如 "Asia/Shanghai") onNull: "<替代值>" // 可选,日期为空时的默认值(如 "N/A") } }
|
1 2 3 4 5 6 7
| { $project: { year: { $year: "$orderDate" }, month: { $month: "$orderDate" } } }
|
字符串操作符
常用操作符:$substr, $concat, $toLower, $toUpper, $trim
$substr 截取字符串
$concat 拼接
$toLower 转换为小写
$toUpper 转换为大写
$trim 去掉首尾空格
$split 将字符串拆分为数组
1 2 3 4 5
| { $project: { fruits: { $split: ["$textField", ","] } // 按逗号分割 } }
|
1 2 3 4 5 6 7 8 9
| { $project: { fullName: { $toUpper: { $concat: ["$firstName", " ", "$lastName"] } } } }
|
逻辑操作符
常用操作符:$and, $or, $not, $cond(条件判断)
$and 逻辑与
$or 逻辑或
$not 逻辑否
$cond 条件判断
$ne 不等于指定值
$ifNull 字段值为 null 或字段不存在的情况,返回指定的默认值
1 2 3 4 5 6
| { $project: { displayName: { $ifNull: ["$nickname", "Anonymous"] } } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| { $project: { isHighValue: { $cond: { if: { $gte: ["$amount", 1000] }, then: "Yes", else: "No" } } } }
|
1 2 3 4 5 6
| "$cond": [ {"$eq": ["$operations.opcode", 1001]}, "$operations.email", "$$REMOVE" ]
|
聚合操作符
常用操作符:$sum, $avg, $max, $min, $push, $addToSet
$sum 取和
$avg 取平均数
$max 取最大值
$min 取最小值
$push 往数组中添加元素
$addToSet 往Set中添加元素,自动去重
1 2 3 4 5 6 7
| { $group: { _id: "$customer", orderIds: { $push: "$_id" } } }
|
数组操作符
常用操作符:$size, $slice, $map, $filter
$size 获取数组长度
$slice 截取数组元素
$map 遍历数组
$filter 过滤数组
$nin 不属于指定数组中的任意值
1 2 3 4 5 6 7 8 9 10 11 12 13
| { $project: { topReviews: { $filter: { input: "$reviews", as: "review", cond: { $gte: ["$$review.rating", 4] } } } } }
|
自定义脚本操作符
$function
$function 允许在聚合管道中执行自定义的 JavaScript 函数,用于处理复杂逻辑或实现内置操作符无法直接完成的操作。
基本语法:
1 2 3 4 5 6 7
| { $function: { body: <function>, // JavaScript 函数 args: [<表达式1>, <表达式2>, ...], // 参数列表(可引用字段或计算结果) lang: "js" // 目前仅支持 JavaScript } }
|
举例:
1 2 3 4 5 6 7 8 9 10 11 12 13
| { $project: { formattedName: { $function: { body: function(name) { return name.charAt(0).toUpperCase() + name.slice(1).toLowerCase(); }, args: ["$name"], // 参数为字段 name 的值 lang: "js" } } } }
|
JavaScript 执行效率低于内置操作符,避免在大数据集或高频操作中使用。
函数体必须是 单行字符串(需转义换行符)或通过 toString() 序列化。
函数参数通过 args 传递,支持聚合表达式(如 “$field”、{ $add: […] })。
$accumulator
$accumulator 操作符允许在聚合管道的 $group 阶段执行自定义累加逻辑,适用于复杂的分组计算场景(如加权平均、动态数据结构维护)。
基本语法:
1 2 3 4 5 6 7 8 9 10
| { $accumulator: { init: <初始化函数>, // 初始化累加状态的函数(返回初始值) accumulate: <累加函数>, // 处理单个文档,更新累加状态 accumulateArgs: [<参数列表>], // 传递给 accumulate 函数的参数(可引用字段) merge: <合并函数>, // 合并不同分片/线程的累加状态 finalize: <终止函数>, // (可选)对最终结果进行后处理 lang: "js" // 目前仅支持 JavaScript } }
|
init 初始化累加器的状态(如 () => ({ sum: 0, count: 0 }))。
accumulate 对每个文档执行,更新累加状态(如 (state, value) => { state.sum += value })。
merge 合并多个分片/并行计算的中间结果(如 (state1, state2) => { … })。
finalize (可选)对最终状态进行加工(如 (state) => state.sum / state.count)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
{ $group: { _id: "$class", weightedAvg: { $accumulator: { init: function() { return { totalScore: 0, totalWeight: 0 }; }, accumulate: function(state, score, weight) { state.totalScore += score * weight; state.totalWeight += weight; return state; }, accumulateArgs: ["$score", "$weight"], // 传递字段值作为参数 merge: function(state1, state2) { return { totalScore: state1.totalScore + state2.totalScore, totalWeight: state1.totalWeight + state2.totalWeight }; }, finalize: function(state) { return state.totalScore / state.totalWeight; }, lang: "js" } } } }
|
自定义 JavaScript 代码执行效率低于内置操作符(如 $sum、$avg)。
函数体需为字符串或通过 toString() 序列化。
在 accumulate 和 merge 中需返回新状态,避免直接修改输入状态。