大家好,我是Edison。
全球电子制造主要集中在中国,面向未来工业4.0、中国制造2025的战略转型升级,互联互通是基础、数据是核心,如何从用户角度来定义设备加工数据的内容完整性、有效性、可扩展性将是工厂通讯连接交换的工作重点。
首先,解释下这两个缩写的意思:IPC是国际电子工业联接协会,CFX是互联工厂数据交换。
IPC-CFX是一个开放式的国际标准,它简化了机器到机器之间的通信。IPC-CFX基于IPC-2591,是全球互联工厂数据交换标准统一定义数据语言格式,我们也可以将其理解为是一个协议,它为制造商、设备、硬件及软件供应商节约沟通和再开发成本,CFX标准的应用将简化并标准化机器设备之间的通信。
总结一下,IPC-CFX就是机器设备之间通信的“统一语言”,是大家都懂的“普通话”而不是“方言”。
话外音:对于IT开发者在追求的DDD(领域驱动设计),其核心思想也是“统一语言”,统一业务和技术之间的语言,提高沟通效率,进而提高软件质量。
目前IPC-CFX主要在电子制造行业得到应用,特别是在 SMT(表面组装技术)行业,越来越多的SMT设备厂商开始加入CFX联盟,比如SMT检测设备知名厂商Koh Young就是其中一员。
IPC-CFX标准不仅适用于 SMT(表面组装技术)的相关生产,也支持机械装配、定制化、包装和运输等上游环节,甚至电气、机械子部件等上游环节。我们使用基于PC-CFX标准的传输信息来开发一些常见的应用例如设备利用率、生产线效率 和 整体设备效率(OEE)指标等。
在IPC-CFX的标准下,设备的数据被定义为制造主题(Topic)和消息结构(Message)。设备不太需要关注数据发送到哪里,数据来源于哪里,只需要知道在什么时机下发送什么数据,收到什么数据执行什么操作即可。
CFX都定义了哪些标准的Topic呢,如下图所示:
以一个Topic "WorkOrdersScheduled"为例,顾名思义:工单已排程。这个Topic代表会发出一个已经排好执行计划的工单,该工单即将在稍后某个特定时间某条生产线如SMT Line 1开始执行生产,其定义的消息数据体如下所示,还是比较完善的:
{ "ScheduledWorkOrders": [ { "WorkOrderIdentifier": { "WorkOrderId": "WO1122334455", "Batch": null }, "Scheduler": { "OperatorIdentifier": "BADGE4486", "ActorType": "Human", "LastName": "Doe", "FirstName": "John", "LoginName": "john.doe@domain1.com" }, "WorkArea": "SMT Line 1", "StartTime": "2018-08-02T11:00:00", "EndTime": "2018-08-02T15:00:00", "ReservedResources": [ "L1PRINTER1", "L1PLACER1", "L1PLACER2", "L1OVEN1" ], "ReservedTools": [ { "UniqueIdentifier": "UID23890430", "Name": "TorqueWrench_123" } ], "ReservedOperators": [ { "OperatorIdentifier": "BADGE489435", "ActorType": "Human", "LastName": "Smith", "FirstName": "Joseph", "LoginName": "joseph.smith@abcdrepairs.com" } ], "ReservedMaterials": [ { "UniqueIdentifier": "UID23849854385", "InternalPartNumber": "PN4452", "Quantity": 0.0 }, { "UniqueIdentifier": "UID23849854386", "InternalPartNumber": "PN4452", "Quantity": 0.0 }, { "UniqueIdentifier": "UID23849854446", "InternalPartNumber": "PN3358", "Quantity": 0.0 } ] } ] }
又如 "WorkOrdersUnschedule" 这个Topic,代表SMT Line 1的某个工单即将被取消,其数据格式就简单很多:
{ "ScheduledWorkOrderIdentifiers": [ { "WorkOrderIdentifier": { "WorkOrderId": "WO1122334455", "Batch": null }, "WorkArea": "SMT Line 1" } ] }
除此之外,CFX还用一个统一的消息信封来包裹这个消息体,我们可以理解为定义了一个如下所示的统一消息格式:
public class CFXEnvelope { public string MessageName {get;set;} // eg. CFX.Produciton.Application.MaterialsApplied public string Version {get;set;} // eg. 1.7 ...... public T MessageBody {get;set;} // 消息体内容:泛型 }
Anyway,这个对于我们IT工程师是比较好理解的,这就跟我们的系统和系统之间通过消息队列(如Kafka)进行发布订阅模式的异步通信一模一样。不过这里呢,是机器与机器,机器与企业之间的通信。
但是,IPC-CFX标准下是基于AMQP协议做消息传输的,每台设备都可以看作是一个AMQP端点,通过发布和订阅实现数据的交互。此外,IPC-CFX还支持点对点(Point-to-Point)的消息模式(请求/响应模式)。
我们都知道,Kafka是不支持AMQP协议的,因此,要使用IPC-CFX就不能直接使用Kafka作为Message Broker,而IPC-CFX官方的案例也都是使用RabbitMQ来写的,虽然我觉得在设备数据交换场景Kafka的性能会更好。
如何让一台台的设备变成符合IPC-CFX标准的AMQP节点呢?常规做法就是在机台侧开发一个程序,这里IPC-CFX组织为我们提供了一个SDK,其实是一个.NET开发包(Nuget安装即可),所以对咱们.NET开发者是十分友好的。
对于.NET 4.6可以使用 CFX.CFXSDK
对于.NET Core及以上可以使用 CFX.CFXSDK.NetStandard
这个SDK提供了以下功能:
将所有CFX消息的用Class/Object表示。
能够将CFX消息对象序列化和反序列化为JSON格式。
能够通过AMQP传输协议将CFX消息发布到一个或多个目的地。
能够通过AMQP传输协议从一个或多个订阅源接收CFX消息。
完全自动化的网络连接故障管理(即使在网络宕机或其他不可靠的情况下保持AMQP连接)。
CFX消息“假脱机”。维护由于网络条件错误而无法传输的CFX消息的持久队列。一旦网络服务恢复,消息将自动按原来的顺序传输。
点对点CFX端点命令(请求/响应)支持。
支持AMQP 1.0 SASL认证和TLS加密。
官方SDK文档传送门:SDK文档
不过,通过学习发现,这个SDK主要还是提供了统一的Topic和Message的数据结构,至于和RabbitMQ的连接,个人感觉不太方便使用,我们完全可以使用其他成熟的RabbitMQ API组件来完成发布和订阅。
接下来,我们来快速实践一下CFX的两种通信方式:发布订阅 和 点对点。
既然IPC-CFX是基于AMQP协议来的,那我们就搭一个RabbitMQ吧。这里我们快速用docker-compose安装一个RabbitMQ。
version: '3' services: rabbitmq1: image: rabbitmq:3.8-management container_name: rabbit-mq-service hostname: rabbit-mq-server ports: - "5672:5672" - "15672:15672" restart: always environment: - RABBITMQ_DEFAULT_USER=rabbit # user account - RABBITMQ_DEFAULT_PASS=EdisonTalk2024 # password volumes: - rabbitmq_data:/var/lib/rabbitmq volumes: rabbitmq_data: driver: local
然后,通过下面的命令把RabbitMQ Run起来:需要注意的点就是需要手动开启AMQP1.0协议!
docker-compose up -d
#进入RabbitMQ容器
docker exec -it rabbit-mq-service /bin/bash
#开启AMQP1.0协议
rabbitmq-plugins enable rabbitmq_amqp1_0
成功运行起来后,能够成功打开RabbitMQ管理界面:
这里我们通过Visual Studio创建一个控制台应用程序,基于.NET Framework 4.8来实现。
首先,安装Nuget包:
其次,完成联接Broker 和 发布Message 的代码:
namespace AMQP.MachineA { /// <summary> /// MachineA: SEWC.SMT.001 /// </summary> public class Program { private const string _machineName = "SDC.SMT.001"; private const string _amqpBroker = "rabbit-mq-server"; // RabbitMQ-Host private const string _amqpUsername = "rabbit"; // RabbitMQ-User private const string _amqpPassword = "rabbit-password"; // RabbitMQ-Password public static void Main(string[] args) { Console.WriteLine($"Current Machine: {_machineName}"); Console.Write($"Current Role: Publisher {Environment.NewLine}"); var connStr = $"host={_amqpBroker};username={_amqpUsername};password={_amqpPassword}"; using (var amqpBus = RabbitHutch.CreateBus(connStr)) { while (true) { Console.WriteLine($"[Info] Starting to send a message to AMQP broker."); // Build a CFX Message of MaterialsApplied var message = new CFXEnvelope(new MaterialsApplied() { TransactionId = Guid.NewGuid(), AppliedMaterials = new List<InstalledMaterial> { new InstalledMaterial() { QuantityInstalled = 1, QuantityNonInstalled = 2 } } }); amqpBus.PubSub.Publish(message); Console.WriteLine($"[Info] Finished to send a message to AMQP broker."); Console.WriteLine("-------------------------------------------------------------------"); Thread.Sleep(1000 * 3); } } } } }
Note:这里只是为了快速演示,实际中账号密码以及Broker地址建议写到配置文件中,并使用AMQPS协议联接,否则你的账号密码会被明文在网络中传输。
参考发布者,仍然创建一个控制台应用程序,安装两个NuGet包。
然后,实现消费者逻辑:
namespace AMQP.MachineB { /// <summary> /// MachineB: SEWC.SMT.002 /// </summary> public class Program { private const string _machineName = "SDC.SMT.002"; private const string _amqpBroker = "rabbit-mq-server"; // RabbitMQ-Host private const string _amqpUsername = "rabbit"; // RabbitMQ-User private const string _amqpPassword = "rabbit-password"; // RabbitMQ-Password public static void Main(string[] args) { Console.WriteLine($"Current Machine: {_machineName}"); Console.WriteLine($"Current Role: Subscriber {Environment.NewLine}"); var connStr = $"host={_amqpBroker};username={_amqpUsername};password={_amqpPassword}"; using (var amqpBus = RabbitHutch.CreateBus(connStr)) { amqpBus.PubSub.Subscribe<CFXEnvelope>(_machineName, message => { if (message.MessageBody is MaterialsApplied) { Console.WriteLine($"[Info] Got a message with topic {message.MessageName} :{Environment.NewLine}{message.ToJson(true)}"); Console.WriteLine("-------------------------------------------------------"); } }); Console.WriteLine("Press any key to exit."); Console.ReadLine(); } } } }
最终的Demo效果如下图所示:
两个控制台应用程序模拟两个机台程序,实现了基于AMQP协议和CFX标准格式的异步通信。但是整体来讲,实现异步通信并不是重点,而是两个机台采用了所谓的“统一语言”。
基于上面的了解,我们知道基于CFX我们还可以让设备之间实现点对点的通信,也可以不通过Broker转发,而且它仍然是基于AMQP协议的。
在点对点模式下,基于CFX SDK,会自动帮你创建一个基于Socket的通信进程,机台程序之间可以互相应答。
(1)机台A
namespace P2P.MachineA { /// <summary> /// MachineA: SEWC.SMT.001 /// </summary> public class Program { private const string _sendCfxHandle = "SDC.SMT.001"; // Sender private const string _receiveCfxHandle = "SDC.SMT.002"; // Receiver private const string _sendRequestUri = "amqp://127.0.0.1:8234"; // Sender private const string _receiveRequestUri = "amqp://127.0.0.1:8235"; // Receiver public static void Main(string[] args) { Console.WriteLine($"Current Machine: {_sendCfxHandle}"); Console.WriteLine($"Current Uri: {_sendRequestUri}"); OpenRequest(); Console.WriteLine("Press Enter Key to start the CFX Sender"); Console.ReadKey(); while (true) { SendRequest(); Thread.Sleep(1000 * 5); // Send message every 5 seconds } } #region AMQP Sender private static AmqpCFXEndpoint _sendRequestEndpoint; private static void OpenRequest() { if (_sendRequestEndpoint != null) { _sendRequestEndpoint.Close(); _sendRequestEndpoint = null; } _sendRequestEndpoint = new AmqpCFXEndpoint(); Console.WriteLine($"[Debug] SendCFXEndpoint.IsOpen : {_sendRequestEndpoint.IsOpen.ToString()}"); _sendRequestEndpoint.Open(_sendCfxHandle, new Uri(_sendRequestUri)); Console.WriteLine($"[Debug] SendCFXEndpoint.IsOpen : {_sendRequestEndpoint.IsOpen.ToString()}"); AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(10 * 2); } private static void SendRequest() { var message = CFXEnvelope.FromCFXMessage(new MaterialsApplied() { TransactionId = Guid.NewGuid(), AppliedMaterials = new List<InstalledMaterial> { new InstalledMaterial() { QuantityInstalled = 1, QuantityNonInstalled = 2 } } }); message.Source = _sendCfxHandle; message.Target = _receiveCfxHandle; message.TimeStamp = DateTime.Now; try { Console.WriteLine($"[Info] Starting to send a message to Target Machine {_receiveCfxHandle}."); var response = _sendRequestEndpoint.ExecuteRequest(_receiveRequestUri, message); Console.WriteLine($"[Info] Target Machine {_receiveCfxHandle} returns : {Environment.NewLine}{response.ToJson(true)}"); } catch (Exception ex) { Console.WriteLine($"[Error] Exception message: {ex.Message}"); } finally { Console.WriteLine("-------------------------------------------------------"); } } #endregion } }
Note:既然是点对点,那发送者就必须要知道接收者的位置。
(2)机台B
namespace P2P.MachineB { /// <summary> /// MachineB: SEWC.SMT.002 /// </summary> public class Program { private const string _receiveCfxHandle = "SDC.SMT.002"; private const string _receiveRequestUri = "amqp://127.0.0.1:8235"; public static void Main(string[] args) { Console.WriteLine($"Current Machine: {_receiveCfxHandle}"); Console.WriteLine($"Current Uri: {_receiveRequestUri}"); OpenListener(); Console.WriteLine("Press Entery Key to end the CFX Listener"); Console.ReadKey(); } #region AMQP Receiver private static AmqpCFXEndpoint _receiveRequestEndpoint; private static void OpenListener() { if (_receiveRequestEndpoint != null) { _receiveRequestEndpoint.Close(); _receiveRequestEndpoint = null; } _receiveRequestEndpoint = new AmqpCFXEndpoint(); _receiveRequestEndpoint.OnRequestReceived -= CFXMessageOnRequestReceived; _receiveRequestEndpoint.OnRequestReceived += CFXMessageOnRequestReceived; Console.WriteLine($"[Debug] SendCFXEndpoint.IsOpen: {_receiveRequestEndpoint.IsOpen.ToString()}"); _receiveRequestEndpoint.Open(_receiveCfxHandle, new Uri(_receiveRequestUri)); Console.WriteLine($"[Debug] SendCFXEndpoint.IsOpen: {_receiveRequestEndpoint.IsOpen.ToString()}"); AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(10 * 2); } private static CFXEnvelope CFXMessageOnRequestReceived(CFXEnvelope message) { Console.WriteLine($"[Info] Got a message from Source Machine {message.Source} :{Environment.NewLine}{message.ToJson(true)}"); Console.WriteLine("-------------------------------------------------------"); var result = (CFXEnvelope)null; if (message.MessageBody is WhoIsThereRequest) { result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse() { CFXHandle = _receiveCfxHandle, RequestNetworkUri = _receiveRequestUri, RequestTargetAddress = "..." }); } else if (message.MessageBody is MaterialsApplied) { result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse() { CFXHandle = _receiveCfxHandle, RequestNetworkUri = _receiveRequestUri, RequestTargetAddress = "..." }); } else { return null; } result.Source = _receiveCfxHandle; result.Target = result.Source; result.TimeStamp = DateTime.Now; return result; } #endregion } }
点对点Demo效果:
本文我们了解了IPC-CFX标准产生的背景 和 用途,它是机器设备之间通信的“统一语言”,是大家都懂的“普通话”而不是“方言”。
首先,IPC-CFX使用AMQP v1.0传输协议实现安全的连接,使用JSON进行数据编码,提供了明确的消息结构和数据内容,确保即插即用。
其次,我们通过两个Demo快速了解了如何实现一个基于CFX标准的机台端应用程序,来实现“统一语言”的设备间通信。
最后,就目前互联网上的资料来看,国内社区对于CFX的应用来看整体都还是不多的,我们也还处于学习阶段,希望未来或许有新的更新分享。
IPC CFX 官方文档:Getting Started with SDK