ZMQ Integration Protocol
Status: Draft v0.1 · Last Updated: 2026-04-22 · Audience: Python server implementation · Future Jope.SMB client implementation
本文件包含 Jope Technology Co., Ltd. 專有資訊,僅供 Jope.SMB inference integration 設計之用。未經書面同意,不得複製、散布或揭露予第三方。
本文件定義 Jope.SMB WPF Operator Console 與 stateless Python Inference Server 之間的 wire contract,涵蓋 ZMQ + REST 雙通道架構、message envelope 格式、error handling contract、performance targets,以及 kickoff meeting 時需要共識的 open items。
Non-goal: ZMQ 使用教學。
Architecture
Split rationale
- ZMQ REQ-REP · hot path — 每次 Raman scan → prediction,round-trip 目標 ≤ 20 ms p95。
- HTTP REST · cold path — model load / training trigger / health check;低頻且多為人工觸發的管理作業。
管理類作業具冪等性、偶發性,使用 REST + JSON 利於 debug。ZMQ inference socket 只處理單一 message type(predict),讓 hot path 保持最精簡。
Transport / Endpoint
| Channel | Protocol | Endpoint | Serializer |
|---|---|---|---|
| Inference (hot path) | ZMQ REQ-REP | tcp://<inference-host>:5555 | MessagePack |
| Management (cold path) | HTTP/1.1 | http://<inference-host>:5556 | JSON |
Inference Server 始終運行於專用 plant-LAN 主機,與 Plant IPC 分離。此配置隔離 inference workload,提供獨立的 CPU / RAM / 選配 GPU 資源,並避免 Python + 模型重訓練影響 Console 的即時硬體 I/O。
建議環境:Linux + systemd(Python ML workload 原生環境),或 Docker / Podman(可重現部署)。範例 endpoint:tcp://10.0.1.42:5555。Console 將 endpoint 存於 config file,plant IT 可在不改 code 的情況下變更 Inference Host 位址。
Socket Options · Python Server
import zmq
ctx = zmq.Context(io_threads=1)
rep = ctx.socket(zmq.REP)
rep.setsockopt(zmq.LINGER, 0)
rep.setsockopt(zmq.RCVHWM, 10)
rep.bind("tcp://0.0.0.0:5555") # bind 全 interface;firewall 限制存取
Socket Options · C# Client (NetMQ)
var req = new RequestSocket();
req.Options.Linger = TimeSpan.Zero;
req.Options.SendHighWatermark = 10;
req.Connect("tcp://10.0.1.42:5555"); // 由 config 讀取 Inference Host endpoint
Connection lifecycle: Console 啟動時建立連線,之後持續重用同一 socket;REP server 則始終保持 listen。
Message Envelope (MessagePack)
所有 ZMQ message 都使用以下 envelope 結構:
{
"v": 1,
"type": "predict",
"id": "c7f3-...",
"ts": 1713760200.123,
"body": { }
}
v— protocol version (int)type— message type (string,見下方)id— unique request id (uuid4 string)ts— Unix timestamp (seconds, float)body— type-specific payload
Reply envelope 額外帶 correlation_id 對應 request 的 id:
{
"v": 1,
"type": "predict_reply",
"id": "e9a2-...",
"correlation_id": "c7f3-...",
"ts": 1713760200.135,
"body": { },
"error": null
}
Server 啟動時 log 支援的 protocol_version。Client 在第一次 predict 之前必須先發送 ping 確認版本相容。版本不符時,Console 抑制啟動並觸發 alarm。
ZMQ Message Types
ping / pong · Heartbeat
Request:
{ "v": 1, "type": "ping", "id": "...", "ts": 0.0, "body": {} }
Reply:
{
"v": 1, "type": "pong", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": {
"server_version": "0.3.1",
"protocol_version": 1,
"model_version": "v5",
"uptime_seconds": 3847,
"python_version": "3.11.4"
},
"error": null
}
Console 每 2 秒發送一次 ping,連續 3 次 timeout → alarm "Inference offline"。單次 ping timeout 為 500 ms。
predict / predict_reply · Core Inference
Request:
{
"v": 1, "type": "predict", "id": "...", "ts": 0.0,
"body": {
"spectrum": {
"wavenumbers": [200.0, 201.5, 3200.0],
"intensities": [0.0123, 0.0098],
"channel": 1,
"integration_ms": 1000,
"scan_seq": 42847
},
"context": {
"batch_id": "PR-2026-0487",
"port": "extract-E1",
"column_index": 3
}
}
}
wavenumbers— 2048 個 wavenumber (cm⁻¹) float 值intensities— 2048 個 intensity float 值,順序對應wavenumberschannel— 1 或 2 (RS2000 dual channel)integration_ms— exposure time (ms)scan_seq— Console-assigned scan sequence (for historian join)batch_id/port/column_index— logging only,server stateless
Reply · Success:
{
"v": 1, "type": "predict_reply", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": {
"concentrations": {
"EPA": 5.234,
"DHA": 3.187,
"DPA": 1.823
},
"confidence": {
"EPA": 0.95,
"DHA": 0.92,
"DPA": 0.88
},
"model_version": "v5",
"inference_ms": 8.3
},
"error": null
}
concentrations— 單位 g/Lconfidence— 0~1,基於 Q residual 計算inference_ms— server-side inference 時間,不含 wire time
Reply · Error:
{
"v": 1, "type": "predict_reply", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": null,
"error": {
"code": "MODEL_NOT_LOADED",
"message": "No active model. Upload and load a model first via REST /model/load.",
"retryable": false
}
}
Error Codes
| Code | Meaning | Retryable |
|---|---|---|
MODEL_NOT_LOADED | No active model | No |
INVALID_SPECTRUM | Wavenumber count mismatch / NaN / negative intensity | No |
SPECTRUM_OUT_OF_RANGE | Spectrum deviates from training distribution (Q residual too large) | No |
INTERNAL_ERROR | Unexpected server exception (需進一步追查) | Yes |
Size Budget
- Request payload (MessagePack): 約 16 KB (2048 × 2 float32 + envelope)
- Reply payload: 約 160 bytes
- Latency budget: ≤ 20 ms p95 round-trip
model_info / model_info_reply · Diagnostics (optional)
回傳當前 model metadata,使用頻率低 — Console 切換到 Model Select 頁時才呼叫。
{
"type": "model_info_reply",
"body": {
"active_version": "v5",
"algorithm": "PLS+Ridge",
"latent_components": 5,
"trained_at": "2026-03-15T10:00:00Z",
"trained_samples": 187,
"rmse": { "EPA": 0.047, "DHA": 0.052, "DPA": 0.061 },
"r2": { "EPA": 0.958, "DHA": 0.942, "DPA": 0.913 },
"wavenumber_range": [200.0, 3200.0],
"wavenumber_count": 2048
}
}
REST Endpoints · Management
Base URL:http://<inference-host>:5556
POST /model/load · Hot-swap active model
不停機切換 active model。
// Request
{ "version": "v6.2", "source": "staged" }
// Response 200
{ "loaded_version": "v6.2", "previous_version": "v5", "load_time_ms": 234 }
400— version 不存在 / model file 毀損409— load in progress (concurrent request)
POST /training/start · Kick off training (async)
非同步啟動 training job,回傳 job_id。
Request (multipart):
csv — file (training set, Raman + HPLC label)
meta.json — { "source_batches": ["PR-0487"], "operator": "..." }
Response 202:
{ "job_id": "train-2026-04-22-001", "estimated_duration_seconds": 180 }
GET /training/{job_id} · Poll progress
{
"job_id": "train-2026-04-22-001",
"status": "running",
"progress": 0.47,
"result": null
}
status ∈ {queued, running, done, failed}。當 done 時:result = { "new_version": "v6.3", "metrics": {...} }。
GET /model/list · List all models
{
"active": "v5",
"models": [
{ "version": "v5", "status": "active", "trained_at": "2026-03-15T..." },
{ "version": "v6.2", "status": "staged", "trained_at": "2026-04-20T..." }
]
}
GET /health · Kubernetes-style health check
{ "status": "ok", "model_loaded": true, "uptime_seconds": 3847 }
Error Handling Contract
| Scenario | Console Action |
|---|---|
ZMQ predict timeout (> 500 ms) | Retry 2 次,仍失敗 → pause batch + alarm "Inference timeout" |
predict_reply.error.retryable = true | Retry 1 次 |
predict_reply.error.retryable = false | Surface to operator · 需 Electronic Signature 覆蓋才能續行 |
連續 3 次 ping timeout | Alarm "Inference offline" + 強制 batch hold |
| REST 4xx / 5xx | Surface 到 Model Select / Training 頁 · 不影響 inference |
Server 錯誤以 structured JSON line 寫入 stderr,可透過 Windows Event Log 或 REST /logs 拉取。
Performance Targets
| Metric | Target | Source |
|---|---|---|
predict round-trip p50 | ≤ 12 ms | |
predict round-trip p95 | ≤ 20 ms | Architecture spec |
predict round-trip p99 | ≤ 50 ms | |
| Throughput | 1 scan / 3 s | Raman RS2000 sampling rate |
ping round-trip p95 | ≤ 5 ms | |
| Model load time | ≤ 1 s | PLS model 體積小 |
| REST availability | 99.9% | Single-machine, no cluster |
MessagePack decode ~1 ms · Pre-process (baseline + SNV + Savitzky-Golay) ~3 ms · PLS+Ridge predict ~2 ms · MessagePack encode ~1 ms · Total server-side ~7 ms, wire ~10 ms → p95 ≤ 20 ms。
Versioning Policy
- Breaking changes — bump envelope 中的
v值。 - Handshake —
ping/pong交換protocol_version,不相容時 Console alarm 並停止predict。 - Compatibility window — Server 至少維持一個 major version 的 backward compatibility。
Recommended Stacks
Python Server
pyzmq >= 25msgpack >= 1.0fastapi + uvicorn(for REST)scikit-learn(PLS + Ridge)numpy
main.py spawns two threads — 一個 ZMQ REP loop,一個 FastAPI uvicorn。
C# Client · Future Jope.SMB App
NetMQ4.xMessagePack-CSharpHttpClient(REST)
Development Milestones
Recommended implementation sequence for the Python server:
| Week | Goal | Detail |
|---|---|---|
| 1 | Mock server | ping + predict 回固定值 (EPA: 5.0, DHA: 3.0, DPA: 1.0),讓 Jope.SMB client 可同步開工不阻塞。 |
| 2 | Real inference | 接入 PLS+Ridge 與現有 pre-process pipeline。 |
| 3 | Model hot swap | 實作 REST /model/load + Model Registry。 |
| 4 | Training pipeline | REST /training/start + /training/{id} 查詢。 |
| 5 | Observability | Structured logs、/health、基本 metrics。 |
| 6 | Hardening | Error paths、timeout、edge cases、load test。 |
Revision Log
| Version | Date | Author | Change |
|---|---|---|---|
| 0.1 | 2026-04-22 | Hubert | Initial draft based on Jope.SMB.Preview Architecture view |