ZMQ Integration Protocol
Status: Draft v0.1 · Last Updated: 2026-04-22 · Audience: Python server implementation · Future Jope.SMB client implementation
本文件包含 Jope Technology Co., Ltd. 專有資訊,僅供 Jope.SMB inference integration 設計之用。未經書面同意,不得複製、散布或揭露予第三方。
This document contains proprietary information of Jope Technology Co., Ltd. It is provided solely for the purpose of designing the Jope.SMB inference integration and may not be reproduced, distributed, or disclosed to third parties without prior written consent.
Wire contract between the Jope.SMB WPF Operator Console and the stateless Python Inference Server. Defines the ZMQ + REST dual-channel protocol, message envelope format, error contract, performance targets, and open items for the kickoff meeting.
Non-goal: Teaching ZMQ basics.
Architecture
Split rationale
- ZMQ REQ-REP · hot path — Every Raman scan → prediction, round-trip budget ≤ 20 ms p95.
- HTTP REST · cold path — Model load / training trigger / health check; low-frequency, mostly human-initiated operations.
Management ops are idempotent and debug-friendly over REST + JSON. The ZMQ inference socket then only serves a single message type (predict), keeping the hot path simple.
Transport / Endpoint
| Channel | Protocol | Endpoint | Serializer |
|---|---|---|---|
| Inference (hot) | ZMQ REQ-REP | tcp://127.0.0.1:5555 | MessagePack |
| Management (cold) | HTTP/1.1 | http://127.0.0.1:5556 | JSON |
Inference Server runs as a Windows Service. It can be deployed on the same Plant IPC as Jope.SMB (loopback, lowest latency), or on a separate machine (more headroom for larger models / parallel inference) — choose based on plant performance requirements and hardware resources.
The endpoint tcp://127.0.0.1:5555 shown above is for the same-machine case. For cross-machine deployment, bind the server to the appropriate NIC address (e.g., tcp://10.0.1.42:5555) and update the Console's connection string accordingly. Protocol semantics are identical in both topologies.
Socket Options · Python Server
import zmq
ctx = zmq.Context(io_threads=1)
rep = ctx.socket(zmq.REP)
rep.setsockopt(zmq.LINGER, 0)
rep.setsockopt(zmq.RCVHWM, 10)
rep.bind("tcp://127.0.0.1:5555")
Socket Options · C# Client (NetMQ)
var req = new RequestSocket();
req.Options.Linger = TimeSpan.Zero;
req.Options.SendHighWatermark = 10;
req.Connect("tcp://127.0.0.1:5555");
Connection lifecycle: Console connects on startup and reuses the socket. The REP server listens indefinitely.
Message Envelope (MessagePack)
All ZMQ messages use the following envelope structure:
{
"v": 1,
"type": "predict",
"id": "c7f3-...",
"ts": 1713760200.123,
"body": { }
}
v— protocol version (int)type— message type (string, see below)id— unique request id (uuid4 string)ts— unix timestamp (float seconds)body— type-specific payload
Reply envelopes include correlation_id matching the request id:
{
"v": 1,
"type": "predict_reply",
"id": "e9a2-...",
"correlation_id": "c7f3-...",
"ts": 1713760200.135,
"body": { },
"error": null
}
Server logs its supported protocol_version on startup. The client must exchange a ping before the first predict to verify compatibility. On mismatch, the Console suppresses startup and raises an alarm.
ZMQ Message Types
ping / pong · Heartbeat
Request:
{ "v": 1, "type": "ping", "id": "...", "ts": 0.0, "body": {} }
Reply:
{
"v": 1, "type": "pong", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": {
"server_version": "0.3.1",
"protocol_version": 1,
"model_version": "v5",
"uptime_seconds": 3847,
"python_version": "3.11.4"
},
"error": null
}
Console sends ping every 2 seconds. Three consecutive timeouts → alarm "Inference offline". Per-ping timeout: 500 ms.
predict / predict_reply · Core Inference
Request:
{
"v": 1, "type": "predict", "id": "...", "ts": 0.0,
"body": {
"spectrum": {
"wavenumbers": [200.0, 201.5, 3200.0],
"intensities": [0.0123, 0.0098],
"channel": 1,
"integration_ms": 1000,
"scan_seq": 42847
},
"context": {
"batch_id": "PR-2026-0487",
"port": "extract-E1",
"column_index": 3
}
}
}
wavenumbers— 2048 floats in cm⁻¹intensities— 2048 floats, same ordering aswavenumberschannel— 1 or 2 (RS2000 dual channel)integration_ms— exposure time (ms)scan_seq— Console-assigned scan sequence (for historian join)batch_id/port/column_index— logging only, server stateless
Reply (success):
{
"v": 1, "type": "predict_reply", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": {
"concentrations": {
"EPA": 5.234,
"DHA": 3.187,
"DPA": 1.823
},
"confidence": {
"EPA": 0.95,
"DHA": 0.92,
"DPA": 0.88
},
"model_version": "v5",
"inference_ms": 8.3
},
"error": null
}
concentrations— g/Lconfidence— 0~1, Q residual-basedinference_ms— server-side only (excludes wire time)
Reply (error):
{
"v": 1, "type": "predict_reply", "id": "...", "correlation_id": "...", "ts": 0.0,
"body": null,
"error": {
"code": "MODEL_NOT_LOADED",
"message": "No active model. Upload and load a model first via REST /model/load.",
"retryable": false
}
}
Error Codes
| Code | Meaning | Retryable |
|---|---|---|
MODEL_NOT_LOADED | No active model | No |
INVALID_SPECTRUM | Wavenumber count mismatch / NaN / negative intensity | No |
SPECTRUM_OUT_OF_RANGE | Spectrum deviates from training distribution (Q residual too large) | No |
INTERNAL_ERROR | Unexpected server exception (log for investigation) | Yes |
Size Budget
- Request payload (MessagePack): ~16 KB (2048 × 2 float32 + envelope)
- Reply payload: ~160 bytes
- Latency budget: ≤ 20 ms p95 round-trip
model_info / model_info_reply · Diagnostics (optional)
Returns active model metadata. Low frequency — called when the Console switches to the Model Select page.
{
"type": "model_info_reply",
"body": {
"active_version": "v5",
"algorithm": "PLS+Ridge",
"latent_components": 5,
"trained_at": "2026-03-15T10:00:00Z",
"trained_samples": 187,
"rmse": { "EPA": 0.047, "DHA": 0.052, "DPA": 0.061 },
"r2": { "EPA": 0.958, "DHA": 0.942, "DPA": 0.913 },
"wavenumber_range": [200.0, 3200.0],
"wavenumber_count": 2048
}
}
REST Endpoints · Management
Base URL: http://127.0.0.1:5556
POST /model/load · Hot-swap active model
Swap the active model without stopping the server.
// Request
{ "version": "v6.2", "source": "staged" }
// Response 200
{ "loaded_version": "v6.2", "previous_version": "v5", "load_time_ms": 234 }
400— version not found / model file corrupted409— load in progress (concurrent request)
POST /training/start · Kick off training (async)
Starts a training job asynchronously, returns job_id.
Request (multipart):
csv — file (training set, Raman + HPLC label)
meta.json — { "source_batches": ["PR-0487"], "operator": "..." }
Response 202:
{ "job_id": "train-2026-04-22-001", "estimated_duration_seconds": 180 }
GET /training/{job_id} · Poll progress
{
"job_id": "train-2026-04-22-001",
"status": "running",
"progress": 0.47,
"result": null
}
status ∈ {queued, running, done, failed}. On done: result = { "new_version": "v6.3", "metrics": {...} }.
GET /model/list
{
"active": "v5",
"models": [
{ "version": "v5", "status": "active", "trained_at": "2026-03-15T..." },
{ "version": "v6.2", "status": "staged", "trained_at": "2026-04-20T..." }
]
}
GET /health · Kubernetes-style health check
{ "status": "ok", "model_loaded": true, "uptime_seconds": 3847 }
Error Handling Contract
| Scenario | Console Action |
|---|---|
ZMQ predict timeout (> 500 ms) | Retry 2 times, if still failing → pause batch + alarm "Inference timeout" |
predict_reply.error.retryable = true | Retry 1 time |
predict_reply.error.retryable = false | Surface to operator · require signature to override |
3 consecutive ping timeouts | Alarm "Inference offline" + force batch hold |
| REST 4xx / 5xx | Surface to Model Select / Training page · does not affect inference |
Server errors are written as structured JSON log lines to stderr. IT can read via Windows Event Log or pull through REST /logs.
Performance Targets
| Metric | Target | Source |
|---|---|---|
predict round-trip p50 | ≤ 12 ms | |
predict round-trip p95 | ≤ 20 ms | Architecture spec |
predict round-trip p99 | ≤ 50 ms | |
| Throughput | 1 scan / 3 s | Raman RS2000 sampling rate |
ping round-trip p95 | ≤ 5 ms | |
| Model load time | ≤ 1 s | PLS model is small |
| REST availability | 99.9% | Single-machine, no cluster |
predictMessagePack decode ~1 ms · Pre-process (baseline + SNV + Savitzky-Golay) ~3 ms · PLS+Ridge predict ~2 ms · MessagePack encode ~1 ms · Total server-side ~7 ms, wire ~10 ms → p95 ≤ 20 ms.
Versioning Policy
- Breaking changes — bump
vin the envelope. - Handshake —
ping/pongexchangeprotocol_version. Mismatch → Console alarm + haltpredict. - Compatibility window — server maintains backward compatibility for at least one major version.
Recommended Stacks
Python Server
pyzmq >= 25msgpack >= 1.0fastapi + uvicorn(for REST)scikit-learn(PLS + Ridge)numpy
main.py spawns two threads — one ZMQ REP loop, one FastAPI uvicorn.
C# Client · Future Jope.SMB App
NetMQ4.xMessagePack-CSharpHttpClient(REST)
Development Milestones
Recommended implementation sequence for the Python server:
| Week | Goal | Detail |
|---|---|---|
| 1 | Mock server | ping + predict return constants (EPA: 5.0, DHA: 3.0, DPA: 1.0). Unblocks Jope.SMB client development. |
| 2 | Real inference | Wire up PLS+Ridge with the existing pre-process pipeline. |
| 3 | Model hot swap | Implement REST /model/load + model registry. |
| 4 | Training pipeline | REST /training/start + /training/{id} polling. |
| 5 | Observability | Structured logs, /health, basic metrics. |
| 6 | Hardening | Error paths, timeout handling, edge cases, load test. |
Revision Log
| Version | Date | Author | Change |
|---|---|---|---|
| 0.1 | 2026-04-22 | Hubert | Initial draft based on Jope.SMB.Preview Architecture view |