首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

Elsa V3学习之Flowchart详解(上)

编程知识
2024年08月19日 21:03

前面我们通过界面学习了Elsa的一些基本使用,若是有实操的小伙伴们,应该可以发现,我们工作流定义中的root,既我们的工作流画布其实也是一个activity,就是Flowchart。那么本文将来解读以下flowchart的执行逻辑。

Flowchart源码

为了方便大家,这里先直接把flowchart的源码贴出。

using System.ComponentModel;
using System.Runtime.CompilerServices;
using Elsa.Extensions;
using Elsa.Workflows.Activities.Flowchart.Contracts;
using Elsa.Workflows.Activities.Flowchart.Extensions;
using Elsa.Workflows.Activities.Flowchart.Models;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Options;
using Elsa.Workflows.Signals;
using Microsoft.Extensions.Logging;

namespace Elsa.Workflows.Activities.Flowchart.Activities;

/// <summary>
/// A flowchart consists of a collection of activities and connections between them.
/// </summary>
[Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
[Browsable(false)]
public class Flowchart : Container
{
    internal const string ScopeProperty = "Scope";

    /// <inheritdoc />
    public Flowchart([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
    {
        OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
        OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
        OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
    }

    /// <summary>
    /// The activity to execute when the flowchart starts.
    /// </summary>
    [Port]
    [Browsable(false)]
    public IActivity? Start { get; set; }

    /// <summary>
    /// A list of connections between activities.
    /// </summary>
    public ICollection<Connection> Connections { get; set; } = new List<Connection>();

    /// <inheritdoc />
    protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
    {
        var startActivity = GetStartActivity(context);

        if (startActivity == null)
        {
            // Nothing else to execute.
            await context.CompleteActivityAsync();
            return;
        }

        // Schedule the start activity.
        await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
    }

    private IActivity? GetStartActivity(ActivityExecutionContext context)
    {
        // If there's a trigger that triggered this workflow, use that.
        var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
        var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : default;

        if (triggerActivity != null)
            return triggerActivity;

        // If an explicit Start activity was provided, use that.
        if (Start != null)
            return Start;

        // If there is a Start activity on the flowchart, use that.
        var startActivity = Activities.FirstOrDefault(x => x is Start);

        if (startActivity != null)
            return startActivity;

        // If there's an activity marked as "Can Start Workflow", use that.
        var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());

        if (canStartWorkflowActivity != null)
            return canStartWorkflowActivity;

        // If there is a single activity that has no inbound connections, use that.
        var root = GetRootActivity();

        if (root != null)
            return root;

        // If no start activity found, return the first activity.
        return Activities.FirstOrDefault();
    }

    /// <summary>
    /// Checks if there is any pending work for the flowchart.
    /// </summary>
    private bool HasPendingWork(ActivityExecutionContext context)
    {
        var workflowExecutionContext = context.WorkflowExecutionContext;
        var activityIds = Activities.Select(x => x.Id).ToList();
        var descendantContexts = context.GetDescendents().Where(x => x.ParentActivityExecutionContext == context).ToList();
        var activityExecutionContexts = descendantContexts.Where(x => activityIds.Contains(x.Activity.Id)).ToList();

        var hasPendingWork = workflowExecutionContext.Scheduler.List().Any(workItem =>
        {
            var ownerInstanceId = workItem.Owner?.Id;

            if (ownerInstanceId == null)
                return false;

            if (ownerInstanceId == context.Id)
                return true;

            var ownerContext = context.WorkflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == ownerInstanceId);
            var ancestors = ownerContext.GetAncestors().ToList();

            return ancestors.Any(x => x == context);
        });

        var hasRunningActivityInstances = activityExecutionContexts.Any(x => x.Status == ActivityStatus.Running);

        return hasRunningActivityInstances || hasPendingWork;
    }

    private IActivity? GetRootActivity()
    {
        // Get the first activity that has no inbound connections.
        var query =
            from activity in Activities
            let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
            where !inboundConnections
            select activity;

        var rootActivity = query.FirstOrDefault();
        return rootActivity;
    }

    private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
    {
        var logger = context.GetRequiredService<ILogger<Flowchart>>();
        var flowchartContext = context.TargetContext;
        var completedActivityContext = context.ChildContext;
        var completedActivity = completedActivityContext.Activity;
        var result = context.Result;

        // If the complete activity's status is anything but "Completed", do not schedule its outbound activities.
        var scheduleChildren = completedActivityContext.Status == ActivityStatus.Completed;
        var outcomeNames = result is Outcomes outcomes
            ? outcomes.Names
            : [null!, "Done"];

        // Only query the outbound connections if the completed activity wasn't already completed.
        var outboundConnections = Connections.Where(connection => connection.Source.Activity == completedActivity && outcomeNames.Contains(connection.Source.Port)).ToList();
        var children = outboundConnections.Select(x => x.Target.Activity).ToList();
        var scope = flowchartContext.GetProperty(ScopeProperty, () => new FlowScope());

        scope.RegisterActivityExecution(completedActivity);

        // If the complete activity is a terminal node, complete the flowchart immediately.
        if (completedActivity is ITerminalNode)
        {
            await flowchartContext.CompleteActivityAsync();
        }
        else if (scheduleChildren)
        {
            if (children.Any())
            {
                // Schedule each child, but only if all of its left inbound activities have already executed.
                foreach (var activity in children)
                {
                    var existingActivity = scope.ContainsActivity(activity);
                    scope.AddActivity(activity);

                    var inboundActivities = Connections.LeftInboundActivities(activity).ToList();

                    // If the completed activity is not part of the left inbound path, always allow its children to be scheduled.
                    if (!inboundActivities.Contains(completedActivity))
                    {
                        await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                        continue;
                    }

                    // If the activity is anything but a join activity, only schedule it if all of its left-inbound activities have executed, effectively implementing a "wait all" join. 
                    if (activity is not IJoinNode)
                    {
                        var executionCount = scope.GetExecutionCount(activity);
                        var haveInboundActivitiesExecuted = inboundActivities.All(x => scope.GetExecutionCount(x) > executionCount);

                        if (haveInboundActivitiesExecuted) 
                            await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
                    }
                    else
                    {
                        // Select an existing activity execution context for this activity, if any.
                        var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =>
                            x.ParentActivityExecutionContext == flowchartContext && x.Activity == activity);
                        var scheduleWorkOptions = new ScheduleWorkOptions
                        {
                            CompletionCallback = OnChildCompletedAsync,
                            ExistingActivityExecutionContext = joinContext,
                            PreventDuplicateScheduling = true
                        };

                        if (joinContext != null)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Attaching to existing join context {JoinContext}", activity.Id, joinContext.Id);
                        else if (!existingActivity)
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Creating new join context", activity.Id);
                        else
                        {
                            logger.LogDebug("Next activity {ChildActivityId} is a join activity. Join context was not found, but activity is already being created", activity.Id);
                            continue;
                        }

                        await flowchartContext.ScheduleActivityAsync(activity, scheduleWorkOptions);
                    }
                }
            }

            if (!children.Any())
            {
                await CompleteIfNoPendingWorkAsync(flowchartContext);
            }
        }

        flowchartContext.SetProperty(ScopeProperty, scope);
    }

    private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
    {
        var hasPendingWork = HasPendingWork(context);

        if (!hasPendingWork)
        {
            var hasFaultedActivities = context.GetActiveChildren().Any(x => x.Status == ActivityStatus.Faulted);

            if (!hasFaultedActivities)
            {
                await context.CompleteActivityAsync();
            }
        }
    }

    private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
    {
        var flowchartContext = context.ReceiverActivityExecutionContext;
        var schedulingActivityContext = context.SenderActivityExecutionContext;
        var schedulingActivity = schedulingActivityContext.Activity;
        var outcomes = signal.Outcomes;
        var outboundConnections = Connections.Where(connection => connection.Source.Activity == schedulingActivity && outcomes.Contains(connection.Source.Port!)).ToList();
        var outboundActivities = outboundConnections.Select(x => x.Target.Activity).ToList();

        if (outboundActivities.Any())
        {
            // Schedule each child.
            foreach (var activity in outboundActivities) await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedAsync);
        }
    }

    private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
    {
        var flowchartContext = context.ReceiverActivityExecutionContext;
        var activity = signal.Activity;
        var activityExecutionContext = signal.ActivityExecutionContext;

        if (activityExecutionContext != null)
        {
            await flowchartContext.ScheduleActivityAsync(activityExecutionContext.Activity, new ScheduleWorkOptions
            {
                ExistingActivityExecutionContext = activityExecutionContext,
                CompletionCallback = OnChildCompletedAsync,
                Input = signal.Input
            });
        }
        else
        {
            await flowchartContext.ScheduleActivityAsync(activity, new ScheduleWorkOptions
            {
                CompletionCallback = OnChildCompletedAsync,
                Input = signal.Input
            });
        }
    }

    private async ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
    {
        await CompleteIfNoPendingWorkAsync(context.ReceiverActivityExecutionContext);
    }
}

首先我们从Activity特性中的描述参数中可以看到介绍flowchart作用的一句话:A flowchart is a collection of activities and connections between them.显而易见,flowchart是一个存储了多个Activity和他们连接关系的集合。有了这些数据,flowchart就可以根据connections中的连接关系对activity按照顺序执行了。

Container

接下来我们再往下看,可以看到flowchart不是直接继承Activity的基类,而是继承Container。
Container包含了Activities和Variables两个集合属性,分别用于存储我们的节点集合和变量集合。
在Container的执行入口方法中,先对变量进行了初始化和注册。

protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
    // Ensure variables have names.
    EnsureNames(Variables);

    // Register variables.
    context.ExpressionExecutionContext.Memory.Declare(Variables);

    // Schedule children.
    await ScheduleChildrenAsync(context);
}

在最后调用了一个ScheduleChildrenAsync方法。这里可以看到这个方法是一个虚方法,可以给子类重写。

protected virtual ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
{
    ScheduleChildren(context);
    return ValueTask.CompletedTask;
}

在flowchart中,执行的入口正是这个重写的ScheduleChildrenAsync方法。

Flowchart执行逻辑

回归正题,接下来我们继续看Flowchart的入口,既ScheduleChildrenAsync方法。

protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
{
    var startActivity = GetStartActivity(context);

    if (startActivity == null)
    {
        // Nothing else to execute.
        await context.CompleteActivityAsync();
        return;
    }

    // Schedule the start activity.
    await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
}

先简单过一下这几行的逻辑,首先获取StartActivity,既获取第一个执行的工作流节点,如果获取不到,这结束工作流。
如果获取到了,那么将发起调度,同时传入一个回调函数,这个回调函数是工作流按照顺序执行的关键。

GetStartActivity

那么接下来看它是如何拿到起始节点的呢。

private IActivity? GetStartActivity(ActivityExecutionContext context)
{
    // If there's a trigger that triggered this workflow, use that.
    var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
    var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : default;

    if (triggerActivity != null)
        return triggerActivity;

    // If an explicit Start activity was provided, use that.
    if (Start != null)
        return Start;

    // If there is a Start activity on the flowchart, use that.
    var startActivity = Activities.FirstOrDefault(x => x is Start);

    if (startActivity != null)
        return startActivity;

    // If there's an activity marked as "Can Start Workflow", use that.
    var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());

    if (canStartWorkflowActivity != null)
        return canStartWorkflowActivity;

    // If there is a single activity that has no inbound connections, use that.
    var root = GetRootActivity();

    if (root != null)
        return root;

    // If no start activity found, return the first activity.
    return Activities.FirstOrDefault();
}

这里从开头可以看到,优先级最高的StartActivity竟然不是Star,而是先获取TriggerActivity,那么什么是TriggerActivity呢,就比如我们的HTTP Endpoint, Event, Cron这些,当我们拖到画布当中时,默认会勾选Trigger workflow这个选项,如下图中间最下方所示。至于他的触发原理后续再深入探讨,这里就稍微过一下就好了。
image.png
若是没有TriggerActivity,那么flowchart会判断Start属性是否存在,如果存在表示明确指定了Start节点,那这个节点将作为工作流的起始节点。
若是Start也不存在,则会从所有的Activities中查找第一个Start节点,若存在,则作为工作流起始节点。
若在Activities中也没有Start节点,则再判断一下是否有节点勾选了Start Of Workflow选项,若是勾选了,则获取第一个勾选的Activity作为起始节点。
image.png
若是再没有符合条件的节点,则会尝试获取root节点。

 private IActivity? GetRootActivity()
 {
     // Get the first activity that has no inbound connections.
     var query =
         from activity in Activities
         let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
         where !inboundConnections
         select activity;

     var rootActivity = query.FirstOrDefault();
     return rootActivity;
 }

通过代码我们可以看到,root节点就是Connections连线关系中的第一个节点。
若是一堆节点里面没有任何连线关系,那么最后则会在所有的Activity中取第一个当作入口。

可以看到,获取我们的StartActivity的逻辑还是挺严谨的。

context.ScheduleActivityAsync

好了,获取到了StartActivity之后,接下来就是真正的发起调度了,context.ScheduleActivityAsync方法就是把我们的StartActivity塞进去调度队列,然后会自动执行节点。这执行的逻辑在后面的文章再解析。这个方法关键的是后面那个Callback方法。既OnChildCompletedAsync。

由于OnChildCompletedAsync的逻辑比较复杂,我们放到下一篇文章再继续讲解。

From:https://www.cnblogs.com/fanshaoO/p/18368210
本文地址: http://shuzixingkong.net/article/1242
0评论
提交 加载更多评论
其他文章 《花100块做个摸鱼小网站! 》第三篇—热搜表结构设计和热搜数据存储
⭐️基础链接导航⭐️ ☁️ 阿里云活动地址 &#128031; 上班摸鱼小网站地址 &#128187; 源码库地址 一、前言 大家好呀,我是summo,第一篇已经教会大家怎么去阿里云买服务器,以及怎么搭建JDK、Redis、MySQL这些环境。第二篇我们把后端的应用搭建好了,并且完成了第一个爬虫(抖
《花100块做个摸鱼小网站! 》第三篇—热搜表结构设计和热搜数据存储 《花100块做个摸鱼小网站! 》第三篇—热搜表结构设计和热搜数据存储 《花100块做个摸鱼小网站! 》第三篇—热搜表结构设计和热搜数据存储
.NET8 Blazor 从入门到精通:(三)类库和表单
目录Razor 类库创建使用使可路由组件可从 RCL 获取静态资源表单EditForm标准输入组件验证HTML 表单 Razor 类库 这里只对 RCL 创建和使用的做一些简单的概述,详细内容参考官方文档 使用 Razor 类库 (RCL) 中的 ASP.NET Core Razor 组件。 创建
.NET8 Blazor 从入门到精通:(三)类库和表单 .NET8 Blazor 从入门到精通:(三)类库和表单 .NET8 Blazor 从入门到精通:(三)类库和表单
基于surging的产品项目-木舟开源了!
一 、 概述 因为前段时间电脑坏了,导致代码遗失,踌躇满志马上上线的平台产品付之东流,现在熬夜在写代码希望能尽快推出企业正常使用的平台产品,而这次把代码开源,一是让大家对surging 使用有个深入的了解,二也是开源社区起到监督推动作用,底层的代码基本上已经完成,比如脚本解析,规则引擎,协议组件的热
基于surging的产品项目-木舟开源了! 基于surging的产品项目-木舟开源了! 基于surging的产品项目-木舟开源了!
SimpleRAG:基于WPF与Semantic Kernel实现的一个简单的RAG应用
SimpleRAG介绍 SimpleRAG是基于WPF与Semantic Kernel实现的一个简单的RAG应用,可用于学习与理解如何使用Semantic Kernel构建RAG应用。 GitHub地址:https://github.com/Ming-jiayou/SimpleRAG 主要功能 AI
SimpleRAG:基于WPF与Semantic Kernel实现的一个简单的RAG应用 SimpleRAG:基于WPF与Semantic Kernel实现的一个简单的RAG应用 SimpleRAG:基于WPF与Semantic Kernel实现的一个简单的RAG应用
Python 加载 TensorFlow 模型
TensorFlow支持多种模型格式,但最常见的两种是SavedModel和HDF5(对于Keras模型),本文简要介绍了TensorFlow中最常见的两种是SavedModel和HDF5的模型,并给出了详细的代码示例。
面试场景题:一次关于线程池使用场景的讨论。
你好呀,我是歪歪。 来一起看看一个关于线程池使用场景上的问题,就当是个场景面试题了。 问题是这样的: 字有点多,我直接给你上个图你就懂了: 前端发起一个生成报表页面的请求,这个页面上的数据由后端多个接口返回,另外由于微服务化了,所以数据散落在每个微服务中,因此需要调用多个下游接口拿到数据进行整合。
面试场景题:一次关于线程池使用场景的讨论。 面试场景题:一次关于线程池使用场景的讨论。 面试场景题:一次关于线程池使用场景的讨论。
Java常用类——包装类 小白版个人推荐
包装类及自动装箱/拆箱 包装类是将Java中的八种基本数据类型封装成的类,所有数据类型都能很方便地与对应的包装类相互转换,以解决应用中要求使用数据类型,而不能使用基本数据类型的情况。 int a = 10; //基本类型的数据 Integer b = new Integer(10); //包装类表示
ollama搭建本地ai大模型并应用调用
1、下载ollama 1)https://ollama.com&#160;进入网址,点击download下载2)下载后直接安装即可。 2、启动配置模型 默认是启动cmd窗口直接输入 1 ollama run llama3 启动llama3大模型&#160;或者启动千问大模型 1 ollama run
ollama搭建本地ai大模型并应用调用 ollama搭建本地ai大模型并应用调用 ollama搭建本地ai大模型并应用调用