-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver_stream_simple.py
More file actions
158 lines (137 loc) · 5.27 KB
/
server_stream_simple.py
File metadata and controls
158 lines (137 loc) · 5.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import socket
import cv2
import numpy as np
import logging
import struct
import time
from flask import Flask, Response
from threading import Thread, Lock
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration variables
CONFIG = {
'HOST': '', # Bind to all interfaces for TCP
'PORT_STREAM': 5001, # Port for video stream
'BUFFER_SIZE': 4096, # Socket buffer size
'FRAME_TIMEOUT': 1.0, # Seconds before dropping frame
'WEB_PORT': 8080, # Port for Flask web server
'EXPECTED_CAMERA': 0 # Single camera index
}
# Initialize Flask app
app = Flask(__name__)
# Store latest frame and edges in memory with thread-safe lock
latest_frame = None
latest_edges = None
frame_lock = Lock()
def receive_frame(client):
"""Receives a frame with header (cam_id, frame_size) and decodes it."""
try:
header = client.recv(8)
if len(header) != 8:
logging.warning("Incomplete header received")
return None, None
cam_id, frame_size = struct.unpack('!II', header)
logging.debug(f"Received header: cam_id={cam_id}, frame_size={frame_size}")
frame_data = b''
bytes_received = 0
while bytes_received < frame_size:
chunk = client.recv(min(CONFIG['BUFFER_SIZE'], frame_size - bytes_received))
if not chunk:
logging.warning("Connection closed during frame receive")
return None, None
frame_data += chunk
bytes_received += len(chunk)
frame_array = np.frombuffer(frame_data, dtype=np.uint8)
frame = cv2.imdecode(frame_array, cv2.IMREAD_COLOR)
if frame is None:
logging.error(f"Failed to decode frame for cam {cam_id}")
return None, None
return cam_id, frame
except Exception as e:
logging.error(f"Error receiving frame: {e}")
return None, None
def apply_edge_detection(frame):
"""Applies Canny edge detection to the frame."""
if frame is None:
return None, None
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
return frame, edges
def generate_mjpeg_stream(data_type):
"""Generates MJPEG stream for frame or edges."""
while True:
with frame_lock:
frame = latest_frame if data_type == 'frame' else latest_edges
if frame is not None:
ret, jpeg = cv2.imencode('.jpg', frame)
if ret:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + jpeg.tobytes() + b'\r\n')
time.sleep(0.1)
@app.route('/')
def index():
"""HTML page to display frame and edge map."""
return '''
<html>
<body>
<h1>Single Camera Stream</h1>
<h2>Camera Feed</h2>
<img src="/video_feed/frame" width="320">
<h2>Edge Map</h2>
<img src="/video_feed/edges" width="320">
</body>
</html>
'''
@app.route('/video_feed/<data_type>')
def video_feed(data_type):
"""Serves MJPEG stream for frame or edges."""
return Response(generate_mjpeg_stream(data_type),
mimetype='multipart/x-mixed-replace; boundary=frame')
def run_flask():
"""Runs Flask web server in a separate thread."""
app.run(host='0.0.0.0', port=CONFIG['WEB_PORT'], threaded=True)
def main():
"""Main loop to receive stream, apply edge detection, and update web frames."""
flask_thread = Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()
logging.info(f"Flask web server started at http://0.0.0.0:{CONFIG['WEB_PORT']}")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((CONFIG['HOST'], CONFIG['PORT_STREAM']))
server.listen(1)
server.settimeout(10.0)
logging.info(f"Listening for stream on port {CONFIG['PORT_STREAM']}...")
try:
client, addr = server.accept()
logging.info(f"Connected to Pi at {addr}")
last_frame_time = time.time()
while True:
cam_id, frame = receive_frame(client)
if frame is None or cam_id is None:
logging.warning("Skipping invalid frame")
continue
if cam_id != CONFIG['EXPECTED_CAMERA']:
logging.warning(f"Unexpected cam_id {cam_id}, expected {CONFIG['EXPECTED_CAMERA']}")
continue
# Apply edge detection
output_frame, edges = apply_edge_detection(frame)
# Update latest frames
with frame_lock:
latest_frame = output_frame
latest_edges = edges
logging.debug(f"Updated frame for cam {cam_id}")
if time.time() - last_frame_time > CONFIG['FRAME_TIMEOUT']:
logging.warning("Frame timeout, possible lag")
last_frame_time = time.time()
except Exception as e:
logging.error(f"Streaming error: {e}")
finally:
client.close()
server.close()
logging.info("Streaming stopped, resources released")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("Streaming server terminated by user")