admin管理员组

文章数量:1122846

I'm trying to make a cluster processing system with a client, broker and nodes. When executing the receive_processed_video function, it stops receiving data after a random time. Is there anything that am I missing?

BROKER CODE

import socket
import threading
import os

# Configuraciones del broker
BROKER_HOST = 'localhost'
BROKER_PORT = 5000
NODE_PORT = 6000

# Guardar la lista de nodos
NODES = []
NODES_LOCK = threading.Lock()


def print_nodes():
    """Función para imprimir la lista actual de nodos conectados."""
    with NODES_LOCK:
        print("Nodos conectados:")
        for i, node_socket in enumerate(NODES, start=1):
            print(f"  Nodo {i}: {node_socket.getpeername()}")
        if not NODES:
            print("  No hay nodos conectados.")


def handle_node_registration(node_socket):
    with NODES_LOCK:
        NODES.append(node_socket)
    print("Nodo registrado.")
    print_nodes()

    try:
        while True:
            node_socket.recv(1024)  # Mantener la conexión abierta
    except:
        with NODES_LOCK:
            NODES.remove(node_socket)
        print("Nodo desconectado.")
        print_nodes()


def node_listener():
    broker_node_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    broker_node_socket.bind((BROKER_HOST, NODE_PORT))
    broker_node_socket.listen(5)
    print(f"Broker de nodos corriendo en {BROKER_HOST}:{NODE_PORT}")

    while True:
        node_socket, _ = broker_node_socket.accept()
        threading.Thread(target=handle_node_registration, args=(node_socket,)).start()


def distribute_video_to_nodes(video_data):
    with NODES_LOCK:
        if len(NODES) == 0:
            print("No hay nodos disponibles.")
            return

        chunk_size = len(video_data) // len(NODES)
        for i, node_socket in enumerate(NODES):
            start_byte = i * chunk_size
            end_byte = (i + 1) * chunk_size if i != len(NODES) - 1 else len(video_data)
            video_chunk = video_data[start_byte:end_byte]
            print(f"Enviando bytes del video a Nodo {i + 1}...")

            # Enviar tamaño de los bytes y luego los datos del video
            data_length = str(len(video_chunk)).encode()
            node_socket.sendall(data_length.ljust(16))  # Enviar el tamaño ajustado a 16 bytes
            node_socket.sendall(video_chunk)
            print(f"Bytes enviados al Nodo {i + 1}.")


def receive_processed_video():
    processed_video_parts = []
    expected_length = 5297096  # Tamaño esperado del video (ajústalo según tu video)

    with NODES_LOCK:
        for i, node_socket in enumerate(NODES):
            print(f"Recibiendo datos procesados del Nodo {i + 1}...")

            # Recibir datos procesados en bloques
            data = b""
            retries = 5  # Número máximo de reintentos
            while len(data) < expected_length and retries > 0:
                try:
                    packet = node_socket.recv(4096)  # 128 KB buffer size
                    if not packet:
                        break  # Salir cuando el nodo cierra la conexión
                    data += packet
                    print(f"Recibido: {len(data)} bytes hasta ahora del Nodo {i + 1}")
                except socket.timeout:
                    print("Tiempo de espera agotado. Intentando continuar...")
                    retries -= 1
                    if retries == 0:
                        print(f"Error: No se pudieron recibir todos los datos del Nodo {i + 1}")
                        break

            if len(data) >= expected_length:
                processed_video_parts.append(data)
                print(f"Datos procesados recibidos completamente del Nodo {i + 1}.")
            else:
                print(f"Error: Datos incompletos recibidos del Nodo {i + 1}.")
                print(f"Bytes esperados: {expected_length}, bytes recibidos: {len(data)}")

    complete_video_data = b''.join(processed_video_parts)
    print(f"Total de bytes recibidos después de la concatenación: {len(complete_video_data)}")

    if len(complete_video_data) >= expected_length:
        output_file = process_video_data(complete_video_data)
        return complete_video_data
    else:
        print("Error: Los datos recibidos no coinciden con el tamaño esperado.")
        return None

def process_video_data(data):
    with open("video_guardado.mp4", "wb") as video_file:
        video_file.write(data)
    print("Video guardado como video_guardado.mp4")
    return data


def broker_server():
    broker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    broker_socket.bind((BROKER_HOST, BROKER_PORT))
    broker_socket.listen(5)
    print(f"Broker de clientes corriendo en {BROKER_HOST}:{BROKER_PORT}")

    while True:
        client_socket, _ = broker_socket.accept()
        print("Cliente conectado.")

        # Recibir el video del cliente
        print("Recibiendo video del cliente...")
        video_length = int(client_socket.recv(1024).decode())
        video_data = b""
        while len(video_data) < video_length:
            print(f"Recibido: {len(video_data)} / {video_length}")
            video_data += client_socket.recv(4096)

        print("Video completo recibido.")

        # Enviar el video a los nodos
        distribute_video_to_nodes(video_data)

        # Recibir el video procesado de los nodos
        processed_video = receive_processed_video()

        if processed_video:
            with open("final_video.mp4", "wb") as video_file:
                video_file.write(processed_video)
            print("Video procesado guardado como 'final_video.mp4'.")

            # Enviar el video procesado al cliente
            client_socket.sendall(processed_video)
            print("Video procesado enviado al cliente.")
        else:
            print("Error: No se recibieron datos procesados.")

        client_socket.close()


if __name__ == "__main__":
    threading.Thread(target=node_listener, daemon=True).start()
    broker_server()


NODE CODE

import socket
import cv2

# Configuraciones del nodo
BROKER_HOST = '127.0.0.1'
BROKER_PORT = 6000  # Puerto donde escucha el broker

def process_video_data(video_data):
    # Guardar el video recibido en un archivo llamado video_recibido.mp4
    with open("video_recibido.mp4", "wb") as video_file:
        video_file.write(video_data)
    print("Video guardado como video_recibido.mp4")

    # Abrir el archivo guardado localmente usando OpenCV
    cap = cv2.VideoCapture("video_recibido.mp4")

    if not cap.isOpened():
        raise Exception("Error al abrir el archivo de video.")

    # Inicialización del sustractor de fondo
    fgbg = cv2.createBackgroundSubtractorKNN()

    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    output_file = "video_grises.mp4"
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height), isColor=False)

    while True:
        ret, frame = cap.read()
        if not ret:
            break  # Fin del video

        # Aplicamos el sustractor de fondo y convertimos a escala de grises
        img_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        out.write(img_gray)

    cap.release()
    out.release()

    print("Video procesado y guardado como:", output_file)
    return output_file

def node_server():
    node_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    node_socket.connect((BROKER_HOST, BROKER_PORT))
    print(f"Nodo conectado al broker en {BROKER_HOST}:{BROKER_PORT}")

    try:
        while True:
            # Recibir el tamaño de los datos
            data_length = node_socket.recv(16).strip()
            if not data_length:
                continue
            data_length = int(data_length.decode())
            # Recibir el video en bytes
            video_data = b""
            while len(video_data) < data_length:
                packet = node_socket.recv(65536)  # Incrementa el tamaño del buffer de recepción a 64 KB
                if not packet:
                    break
                video_data += packet
            print("Video recibido")

            # Procesar el video recibido
            output_file = process_video_data(video_data)

            # Enviar el video procesado al broker usando el mismo socket
            send_video_to_broker(node_socket, output_file)

    except Exception as e:
        print(f"Error: {e}")

    finally:
        node_socket.close()  # Cerrar el socket solo al finalizar toda la operación
        print("Conexión con el broker cerrada.")

def send_video_to_broker(node_socket, output_file):
    # Leer el video procesado
    chunk_size = 1024 * 4 # 64 KB
    with open(output_file, "rb") as video_file:
        while True:
            video_chunk = video_file.read(chunk_size)
            if not video_chunk:
                break
            node_socket.sendall(video_chunk)
            print(f"Enviado chunk de {len(video_chunk)} bytes al broker")
        print("Video procesado enviado al broker")

if __name__ == "__main__":
    node_server()

I have tried to increase the buffer size, to change the way the node sends the video. None of these have worked, it stops after a random amount of bytes received

本文标签: pythonWhy is my receiveprocessedvideo function not receiving the full amount of dataStack Overflow