ZeroMQ Handler
Send log messages via ZeroMQ for high-performance, low-latency log distribution.
Installation
pip install richcolorlog[zeromq]
# or
pip install pyzmq
Basic Usage
from richcolorlog import setup_logging
logger = setup_logging(
zeromq=True,
zeromq_host='localhost',
zeromq_port=5555,
)
logger.info("This goes via ZeroMQ!")
Configuration Parameters
Parameter |
Default |
Description |
|---|---|---|
|
|
Enable ZeroMQ handler |
|
|
ZeroMQ endpoint hostname |
|
|
ZeroMQ endpoint port |
|
|
Socket type: ‘PUB’ or ‘PUSH’ |
|
|
Minimum level for ZeroMQ |
Socket Types
PUB Socket (Default)
Publish-subscribe pattern. Logger binds, subscribers connect:
# Logger (Publisher)
logger = setup_logging(
zeromq=True,
zeromq_socket_type='PUB',
zeromq_port=5555,
)
PUSH Socket
Pipeline pattern. Logger connects to a collector:
# Logger (Pusher)
logger = setup_logging(
zeromq=True,
zeromq_socket_type='PUSH',
zeromq_host='collector.example.com',
zeromq_port=5555,
)
Message Format
Messages are sent as space-separated topic and JSON:
info {"timestamp": "2025-01-15T10:30:45.123456", "level": "INFO", ...}
The topic is the lowercase log level.
Subscriber Example
PUB/SUB Pattern
import zmq
import json
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
# Subscribe to all messages
socket.setsockopt_string(zmq.SUBSCRIBE, "")
# Or subscribe to specific levels
socket.setsockopt_string(zmq.SUBSCRIBE, "error")
socket.setsockopt_string(zmq.SUBSCRIBE, "critical")
while True:
message = socket.recv_string()
topic, json_data = message.split(" ", 1)
log = json.loads(json_data)
print(f"[{log['level']}] {log['message']}")
PUSH/PULL Pattern
import zmq
import json
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5555") # Collector binds
while True:
message = socket.recv_string()
topic, json_data = message.split(" ", 1)
log = json.loads(json_data)
print(f"[{log['level']}] {log['message']}")
Multi-Subscriber Architecture
App1 (PUB:5555) ──┐
│
App2 (PUB:5556) ──┼──► Collector (SUB) ──► Database
│
App3 (PUB:5557) ──┘
Proxy/Forwarder
For many-to-many routing:
import zmq
context = zmq.Context()
# Frontend: receive from publishers
frontend = context.socket(zmq.XSUB)
frontend.bind("tcp://*:5555")
# Backend: send to subscribers
backend = context.socket(zmq.XPUB)
backend.bind("tcp://*:5556")
# Start proxy
zmq.proxy(frontend, backend)
Applications connect to 5555, subscribers to 5556.
Production Configuration
logger = setup_logging(
name='production_app',
level='INFO',
zeromq=True,
zeromq_host='0.0.0.0', # Bind to all interfaces
zeromq_port=5555,
zeromq_socket_type='PUB',
zeromq_level='INFO',
)
Performance Considerations
ZeroMQ is designed for high-throughput:
Non-blocking sends
In-memory queuing
No broker overhead (peer-to-peer)
For extremely high volumes:
import zmq
# Set high water mark to prevent memory issues
socket.setsockopt(zmq.SNDHWM, 10000)
# Set linger to avoid hanging on close
socket.setsockopt(zmq.LINGER, 0)
Direct Handler Usage
import logging
from richcolorlog.logger import ZeroMQHandler
handler = ZeroMQHandler(
host='localhost',
port=5555,
socket_type='PUB',
level=logging.WARNING
)
logger = logging.getLogger('myapp')
logger.addHandler(handler)
Cleanup
# Manual cleanup
for handler in logger.handlers:
if isinstance(handler, ZeroMQHandler):
handler.close()
# Or use logging shutdown
logging.shutdown()