admin管理员组文章数量:1122846
I'm trying to make a cluster processing system with a client, broker and nodes. When executing the receive_processed_video function, it stops receiving data after a random time. Is there anything that am I missing?
BROKER CODE
import socket
import threading
import os
# Configuraciones del broker
BROKER_HOST = 'localhost'
BROKER_PORT = 5000
NODE_PORT = 6000
# Guardar la lista de nodos
NODES = []
NODES_LOCK = threading.Lock()
def print_nodes():
"""Función para imprimir la lista actual de nodos conectados."""
with NODES_LOCK:
print("Nodos conectados:")
for i, node_socket in enumerate(NODES, start=1):
print(f" Nodo {i}: {node_socket.getpeername()}")
if not NODES:
print(" No hay nodos conectados.")
def handle_node_registration(node_socket):
with NODES_LOCK:
NODES.append(node_socket)
print("Nodo registrado.")
print_nodes()
try:
while True:
node_socket.recv(1024) # Mantener la conexión abierta
except:
with NODES_LOCK:
NODES.remove(node_socket)
print("Nodo desconectado.")
print_nodes()
def node_listener():
broker_node_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
broker_node_socket.bind((BROKER_HOST, NODE_PORT))
broker_node_socket.listen(5)
print(f"Broker de nodos corriendo en {BROKER_HOST}:{NODE_PORT}")
while True:
node_socket, _ = broker_node_socket.accept()
threading.Thread(target=handle_node_registration, args=(node_socket,)).start()
def distribute_video_to_nodes(video_data):
with NODES_LOCK:
if len(NODES) == 0:
print("No hay nodos disponibles.")
return
chunk_size = len(video_data) // len(NODES)
for i, node_socket in enumerate(NODES):
start_byte = i * chunk_size
end_byte = (i + 1) * chunk_size if i != len(NODES) - 1 else len(video_data)
video_chunk = video_data[start_byte:end_byte]
print(f"Enviando bytes del video a Nodo {i + 1}...")
# Enviar tamaño de los bytes y luego los datos del video
data_length = str(len(video_chunk)).encode()
node_socket.sendall(data_length.ljust(16)) # Enviar el tamaño ajustado a 16 bytes
node_socket.sendall(video_chunk)
print(f"Bytes enviados al Nodo {i + 1}.")
def receive_processed_video():
processed_video_parts = []
expected_length = 5297096 # Tamaño esperado del video (ajústalo según tu video)
with NODES_LOCK:
for i, node_socket in enumerate(NODES):
print(f"Recibiendo datos procesados del Nodo {i + 1}...")
# Recibir datos procesados en bloques
data = b""
retries = 5 # Número máximo de reintentos
while len(data) < expected_length and retries > 0:
try:
packet = node_socket.recv(4096) # 128 KB buffer size
if not packet:
break # Salir cuando el nodo cierra la conexión
data += packet
print(f"Recibido: {len(data)} bytes hasta ahora del Nodo {i + 1}")
except socket.timeout:
print("Tiempo de espera agotado. Intentando continuar...")
retries -= 1
if retries == 0:
print(f"Error: No se pudieron recibir todos los datos del Nodo {i + 1}")
break
if len(data) >= expected_length:
processed_video_parts.append(data)
print(f"Datos procesados recibidos completamente del Nodo {i + 1}.")
else:
print(f"Error: Datos incompletos recibidos del Nodo {i + 1}.")
print(f"Bytes esperados: {expected_length}, bytes recibidos: {len(data)}")
complete_video_data = b''.join(processed_video_parts)
print(f"Total de bytes recibidos después de la concatenación: {len(complete_video_data)}")
if len(complete_video_data) >= expected_length:
output_file = process_video_data(complete_video_data)
return complete_video_data
else:
print("Error: Los datos recibidos no coinciden con el tamaño esperado.")
return None
def process_video_data(data):
with open("video_guardado.mp4", "wb") as video_file:
video_file.write(data)
print("Video guardado como video_guardado.mp4")
return data
def broker_server():
broker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
broker_socket.bind((BROKER_HOST, BROKER_PORT))
broker_socket.listen(5)
print(f"Broker de clientes corriendo en {BROKER_HOST}:{BROKER_PORT}")
while True:
client_socket, _ = broker_socket.accept()
print("Cliente conectado.")
# Recibir el video del cliente
print("Recibiendo video del cliente...")
video_length = int(client_socket.recv(1024).decode())
video_data = b""
while len(video_data) < video_length:
print(f"Recibido: {len(video_data)} / {video_length}")
video_data += client_socket.recv(4096)
print("Video completo recibido.")
# Enviar el video a los nodos
distribute_video_to_nodes(video_data)
# Recibir el video procesado de los nodos
processed_video = receive_processed_video()
if processed_video:
with open("final_video.mp4", "wb") as video_file:
video_file.write(processed_video)
print("Video procesado guardado como 'final_video.mp4'.")
# Enviar el video procesado al cliente
client_socket.sendall(processed_video)
print("Video procesado enviado al cliente.")
else:
print("Error: No se recibieron datos procesados.")
client_socket.close()
if __name__ == "__main__":
threading.Thread(target=node_listener, daemon=True).start()
broker_server()
NODE CODE
import socket
import cv2
# Configuraciones del nodo
BROKER_HOST = '127.0.0.1'
BROKER_PORT = 6000 # Puerto donde escucha el broker
def process_video_data(video_data):
# Guardar el video recibido en un archivo llamado video_recibido.mp4
with open("video_recibido.mp4", "wb") as video_file:
video_file.write(video_data)
print("Video guardado como video_recibido.mp4")
# Abrir el archivo guardado localmente usando OpenCV
cap = cv2.VideoCapture("video_recibido.mp4")
if not cap.isOpened():
raise Exception("Error al abrir el archivo de video.")
# Inicialización del sustractor de fondo
fgbg = cv2.createBackgroundSubtractorKNN()
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
output_file = "video_grises.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height), isColor=False)
while True:
ret, frame = cap.read()
if not ret:
break # Fin del video
# Aplicamos el sustractor de fondo y convertimos a escala de grises
img_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
out.write(img_gray)
cap.release()
out.release()
print("Video procesado y guardado como:", output_file)
return output_file
def node_server():
node_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
node_socket.connect((BROKER_HOST, BROKER_PORT))
print(f"Nodo conectado al broker en {BROKER_HOST}:{BROKER_PORT}")
try:
while True:
# Recibir el tamaño de los datos
data_length = node_socket.recv(16).strip()
if not data_length:
continue
data_length = int(data_length.decode())
# Recibir el video en bytes
video_data = b""
while len(video_data) < data_length:
packet = node_socket.recv(65536) # Incrementa el tamaño del buffer de recepción a 64 KB
if not packet:
break
video_data += packet
print("Video recibido")
# Procesar el video recibido
output_file = process_video_data(video_data)
# Enviar el video procesado al broker usando el mismo socket
send_video_to_broker(node_socket, output_file)
except Exception as e:
print(f"Error: {e}")
finally:
node_socket.close() # Cerrar el socket solo al finalizar toda la operación
print("Conexión con el broker cerrada.")
def send_video_to_broker(node_socket, output_file):
# Leer el video procesado
chunk_size = 1024 * 4 # 64 KB
with open(output_file, "rb") as video_file:
while True:
video_chunk = video_file.read(chunk_size)
if not video_chunk:
break
node_socket.sendall(video_chunk)
print(f"Enviado chunk de {len(video_chunk)} bytes al broker")
print("Video procesado enviado al broker")
if __name__ == "__main__":
node_server()
I have tried to increase the buffer size, to change the way the node sends the video. None of these have worked, it stops after a random amount of bytes received
本文标签: pythonWhy is my receiveprocessedvideo function not receiving the full amount of dataStack Overflow
版权声明:本文标题:python - Why is my receive_processed_video function not receiving the full amount of data - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736300125a1930706.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论