跳至主要内容

ZMQ Integration Protocol

Status: Draft v0.1 · Last Updated: 2026-04-22 · Audience: Python server implementation · Future Jope.SMB client implementation

機密

本文件包含 Jope Technology Co., Ltd. 專有資訊,僅供 Jope.SMB inference integration 設計之用。未經書面同意,不得複製、散布或揭露予第三方。

本文件定義 Jope.SMB WPF Operator Console 與 stateless Python Inference Server 之間的 wire contract,涵蓋 ZMQ + REST 雙通道架構、message envelope 格式、error handling contract、performance targets,以及 kickoff meeting 時需要共識的 open items。

Non-goal: ZMQ 使用教學。


Architecture

Split rationale

  • ZMQ REQ-REP · hot path — 每次 Raman scan → prediction,round-trip 目標 ≤ 20 ms p95。
  • HTTP REST · cold path — model load / training trigger / health check;低頻且多為人工觸發的管理作業。
Why the split

管理類作業具冪等性、偶發性,使用 REST + JSON 利於 debug。ZMQ inference socket 只處理單一 message type(predict),讓 hot path 保持最精簡。


Transport / Endpoint

ChannelProtocolEndpointSerializer
Inference (hot path)ZMQ REQ-REPtcp://<inference-host>:5555MessagePack
Management (cold path)HTTP/1.1http://<inference-host>:5556JSON
Deployment

Inference Server 始終運行於專用 plant-LAN 主機,與 Plant IPC 分離。此配置隔離 inference workload,提供獨立的 CPU / RAM / 選配 GPU 資源,並避免 Python + 模型重訓練影響 Console 的即時硬體 I/O。

建議環境:Linux + systemd(Python ML workload 原生環境),或 Docker / Podman(可重現部署)。範例 endpoint:tcp://10.0.1.42:5555。Console 將 endpoint 存於 config file,plant IT 可在不改 code 的情況下變更 Inference Host 位址。

Socket Options · Python Server

import zmq
ctx = zmq.Context(io_threads=1)
rep = ctx.socket(zmq.REP)
rep.setsockopt(zmq.LINGER, 0)
rep.setsockopt(zmq.RCVHWM, 10)
rep.bind("tcp://0.0.0.0:5555") # bind 全 interface;firewall 限制存取

Socket Options · C# Client (NetMQ)

var req = new RequestSocket();
req.Options.Linger = TimeSpan.Zero;
req.Options.SendHighWatermark = 10;
req.Connect("tcp://10.0.1.42:5555"); // 由 config 讀取 Inference Host endpoint

Connection lifecycle: Console 啟動時建立連線,之後持續重用同一 socket;REP server 則始終保持 listen。


Message Envelope (MessagePack)

所有 ZMQ message 都使用以下 envelope 結構:

{
"v": 1,
"type": "predict",
"id": "c7f3-...",
"ts": 1713760200.123,
"body": { }
}
  • v — protocol version (int)
  • type — message type (string,見下方)
  • id — unique request id (uuid4 string)
  • ts — Unix timestamp (seconds, float)
  • body — type-specific payload

Reply envelope 額外帶 correlation_id 對應 request 的 id

{
"v": 1,
"type": "predict_reply",
"id": "e9a2-...",
"correlation_id": "c7f3-...",
"ts": 1713760200.135,
"body": { },
"error": null
}
Version handshake

Server 啟動時 log 支援的 protocol_version。Client 在第一次 predict 之前必須先發送 ping 確認版本相容。版本不符時,Console 抑制啟動並觸發 alarm。


ZMQ Message Types

ping / pong · Heartbeat

Request:

{ "v": 1, "type": "ping", "id": "...", "ts": 0.0, "body": {} }

Reply:

{
"v": 1, "type": "pong", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": {
"server_version": "0.3.1",
"protocol_version": 1,
"model_version": "v5",
"uptime_seconds": 3847,
"python_version": "3.11.4"
},
"error": null
}
Cadence

Console 每 2 秒發送一次 ping,連續 3 次 timeout → alarm "Inference offline"。單次 ping timeout 為 500 ms。

predict / predict_reply · Core Inference

Request:

{
"v": 1, "type": "predict", "id": "...", "ts": 0.0,
"body": {
"spectrum": {
"wavenumbers": [200.0, 201.5, 3200.0],
"intensities": [0.0123, 0.0098],
"channel": 1,
"integration_ms": 1000,
"scan_seq": 42847
},
"context": {
"batch_id": "PR-2026-0487",
"port": "extract-E1",
"column_index": 3
}
}
}
  • wavenumbers — 2048 個 wavenumber (cm⁻¹) float 值
  • intensities — 2048 個 intensity float 值,順序對應 wavenumbers
  • channel — 1 或 2 (RS2000 dual channel)
  • integration_ms — exposure time (ms)
  • scan_seq — Console-assigned scan sequence (for historian join)
  • batch_id / port / column_index — logging only,server stateless

Reply · Success:

{
"v": 1, "type": "predict_reply", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": {
"concentrations": {
"EPA": 5.234,
"DHA": 3.187,
"DPA": 1.823
},
"confidence": {
"EPA": 0.95,
"DHA": 0.92,
"DPA": 0.88
},
"model_version": "v5",
"inference_ms": 8.3
},
"error": null
}
  • concentrations — 單位 g/L
  • confidence — 0~1,基於 Q residual 計算
  • inference_ms — server-side inference 時間,不含 wire time

Reply · Error:

{
"v": 1, "type": "predict_reply", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": null,
"error": {
"code": "MODEL_NOT_LOADED",
"message": "No active model. Upload and load a model first via REST /model/load.",
"retryable": false
}
}

Error Codes

CodeMeaningRetryable
MODEL_NOT_LOADEDNo active modelNo
INVALID_SPECTRUMWavenumber count mismatch / NaN / negative intensityNo
SPECTRUM_OUT_OF_RANGESpectrum deviates from training distribution (Q residual too large)No
INTERNAL_ERRORUnexpected server exception (需進一步追查)Yes

Size Budget

  • Request payload (MessagePack): 約 16 KB (2048 × 2 float32 + envelope)
  • Reply payload: 約 160 bytes
  • Latency budget: ≤ 20 ms p95 round-trip

model_info / model_info_reply · Diagnostics (optional)

回傳當前 model metadata,使用頻率低 — Console 切換到 Model Select 頁時才呼叫。

{
"type": "model_info_reply",
"body": {
"active_version": "v5",
"algorithm": "PLS+Ridge",
"latent_components": 5,
"trained_at": "2026-03-15T10:00:00Z",
"trained_samples": 187,
"rmse": { "EPA": 0.047, "DHA": 0.052, "DPA": 0.061 },
"r2": { "EPA": 0.958, "DHA": 0.942, "DPA": 0.913 },
"wavenumber_range": [200.0, 3200.0],
"wavenumber_count": 2048
}
}

REST Endpoints · Management

Base URL:http://<inference-host>:5556

POST /model/load · Hot-swap active model

不停機切換 active model。

// Request
{ "version": "v6.2", "source": "staged" }

// Response 200
{ "loaded_version": "v6.2", "previous_version": "v5", "load_time_ms": 234 }
  • 400 — version 不存在 / model file 毀損
  • 409 — load in progress (concurrent request)

POST /training/start · Kick off training (async)

非同步啟動 training job,回傳 job_id

Request (multipart):
csv — file (training set, Raman + HPLC label)
meta.json — { "source_batches": ["PR-0487"], "operator": "..." }

Response 202:
{ "job_id": "train-2026-04-22-001", "estimated_duration_seconds": 180 }

GET /training/{job_id} · Poll progress

{
"job_id": "train-2026-04-22-001",
"status": "running",
"progress": 0.47,
"result": null
}

status ∈ {queued, running, done, failed}。當 done 時:result = { "new_version": "v6.3", "metrics": {...} }

GET /model/list · List all models

{
"active": "v5",
"models": [
{ "version": "v5", "status": "active", "trained_at": "2026-03-15T..." },
{ "version": "v6.2", "status": "staged", "trained_at": "2026-04-20T..." }
]
}

GET /health · Kubernetes-style health check

{ "status": "ok", "model_loaded": true, "uptime_seconds": 3847 }

Error Handling Contract

ScenarioConsole Action
ZMQ predict timeout (> 500 ms)Retry 2 次,仍失敗 → pause batch + alarm "Inference timeout"
predict_reply.error.retryable = trueRetry 1 次
predict_reply.error.retryable = falseSurface to operator · 需 Electronic Signature 覆蓋才能續行
連續 3 次 ping timeoutAlarm "Inference offline" + 強制 batch hold
REST 4xx / 5xxSurface 到 Model Select / Training 頁 · 不影響 inference

Server 錯誤以 structured JSON line 寫入 stderr,可透過 Windows Event Log 或 REST /logs 拉取。


Performance Targets

MetricTargetSource
predict round-trip p50≤ 12 ms
predict round-trip p95≤ 20 msArchitecture spec
predict round-trip p99≤ 50 ms
Throughput1 scan / 3 sRaman RS2000 sampling rate
ping round-trip p95≤ 5 ms
Model load time≤ 1 sPLS model 體積小
REST availability99.9%Single-machine, no cluster
Server-side predict budget

MessagePack decode ~1 ms · Pre-process (baseline + SNV + Savitzky-Golay) ~3 ms · PLS+Ridge predict ~2 ms · MessagePack encode ~1 ms · Total server-side ~7 ms, wire ~10 ms → p95 ≤ 20 ms


Versioning Policy

  • Breaking changes — bump envelope 中的 v 值。
  • Handshakeping / pong 交換 protocol_version,不相容時 Console alarm 並停止 predict
  • Compatibility window — Server 至少維持一個 major version 的 backward compatibility。

Python Server

  • pyzmq >= 25
  • msgpack >= 1.0
  • fastapi + uvicorn (for REST)
  • scikit-learn (PLS + Ridge)
  • numpy
Recommended structure

main.py spawns two threads — 一個 ZMQ REP loop,一個 FastAPI uvicorn。

C# Client · Future Jope.SMB App

  • NetMQ 4.x
  • MessagePack-CSharp
  • HttpClient (REST)

Development Milestones

Recommended implementation sequence for the Python server:

WeekGoalDetail
1Mock serverping + predict 回固定值 (EPA: 5.0, DHA: 3.0, DPA: 1.0),讓 Jope.SMB client 可同步開工不阻塞。
2Real inference接入 PLS+Ridge 與現有 pre-process pipeline。
3Model hot swap實作 REST /model/load + Model Registry。
4Training pipelineREST /training/start + /training/{id} 查詢。
5ObservabilityStructured logs、/health、基本 metrics。
6HardeningError paths、timeout、edge cases、load test。

Revision Log

VersionDateAuthorChange
0.12026-04-22HubertInitial draft based on Jope.SMB.Preview Architecture view