- NoneGrouping
- AllGrouping
- PartialKeyGrouping
2021SC@SDUSC NoneGrouping
不关注并行处理负载均衡策略时使用该方式,目前等同于Shuffle Grouping,另外Storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。
public static class NoneGrouping implements CustomStreamGrouping {
private final Random random;
private List targetTasks;
private int numTasks;
public NoneGrouper() {
random = new Random();
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
this.targetTasks = targetTasks;
this.numTasks = targetTasks.size();
}
@Override
public List chooseTasks(int taskId, List
这里通过random.nextInt(numTasks)随机取task
AllGrouping广播发送,对于每一个tuple,所有的bolts都会收到 。
public static class AllGrouping implements CustomStreamGrouping {
private List targetTasks;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
this.targetTasks = targetTasks;
}
@Override
public List chooseTasks(int taskId, List
这里返回所有的targetTasks。
PartialKeyGroupingpublic class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -1672360572274911808L;
private List targetTasks;
private Fields fields = null;
private Fields outFields = null;
private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;
public PartialKeyGrouping() {
this(null);
}
public PartialKeyGrouping(Fields fields) {
this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
this(fields, assignmentCreator, new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
this.targetTasks = targetTasks;
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}
@Override
public List chooseTasks(int taskId, List
在prepare的时候,初始化了long[] targetTaskStats用于统计每个task
partialKeyGrouping如果没有指定fields,则默认按outputFields的第一个field来计算。
BalancedTargetSelector根据选中的taskId,然后根据targetTaskStats计算taskIdWithMinLoad返回。
这里通过RandomTwoTaskAssignmentCreator来选中两个taskId,然后选择使用次数小的那个。
getKeyBytes()方法从输入的Tuple中提取键。



