admin管理员组

文章数量:1302268

I am using multiprocessing in my code, the code works fine executed from cmd, vs code etc, but when I create an exe with pyinstaller the code executes from the beginning several times, (I know because the scrip must print an ascii art message just 1 time but instead is printed several times when is runned by an exe) it prints the ascii art depending of the processes I have created.

How It looks executed from vs code

How It looks executed from an exe created with py installer and 1 process created As you can apreciate the ascii art is printed again...

At the beggining I thought it was a pyinstaller problem and asked chat gpt if multiprocessing and creating an exe with pyinstaller is possible. I told chat gpt I create the exe like this: pyinstaller "server_Camera2.py" --onefile

Chat gpt answered: That's a known issue when using multiprocessing with PyInstaller's --onefile mode. The problem stems from how PyInstaller bundles the application and handles the creation of processes within that bundled environment. Essentially, when you create a multiprocessing.Process, it tries to create a new process that's a copy of the main process. With --onefile, the entire application is a single executable, and the process creation mechanism can get confused, leading to the application seemingly running twice.

I changed it to: pyinstaller "server_Camera2.py" --onedir

It was the same, later I noticed that when the script executes this part of the code:

for thread in self.Cv2Threads:  # Start all Cv2 processes.
            thread.start()

Is when you can see in the cmd that the ascii arts is being printed again. If comment this lines the exe created does not do the weird behaivor describe above.

After refactoring and doing several tests, the script keep "starting" several times (depending of the created processes) when using the exe, I can't pin point what I am doing wrong from the point of view of creating the exe, because executed/runned from vs code etc the code works just fine.

This is my code after refactoring etc. (still works fine with vs code but does crazy things from an exe)

# Standard library imports
import datetime     # Time
import os
import queue
import re           # Regular Expressions
import socket       # SSH Communication
import sys
import traceback    # Error Handling
import threading    # Multithreading
import multiprocessing
import time         # Sleep threads to free the processor

# Third party imports  
import cv2
from configparser import ConfigParser
import pathlib

# Importing Custom Libraries
import Misc.Json
import Reports.dictionaries
from Logger.PyLogger import PyLogger
#sys.path.insert(0,('C:\XiL_Stress_Framework\Phoenix Stress Framework\CameraTool\camera_tool_lib'))
#import CV2functions
# -> To generate the exe with all libaries in
from camera_tool_lib import CV2functions 


# - Custom Exceptions ----------------------------------------------------------
class ConfigFileNotFoundException(Exception):
    pass

class PyCamera :

    def __init__ ( self                                                        ,
        VideoIterationNumber    : str                                          ,
        ) :

        Timestamp                   = datetime.datetime.now()
        TimestampStr                = f"{datetime.datetime.now():%Y%m%d_%H%M%S}"
        executable_path             = os.path.split(sys.executable)
        Executed_from_exe           = bool

        # - Determine if the script is working from an .exe to get the correct path
        if executable_path[1] == "server_Camera.exe":
            config_path = executable_path[0] + "\config.ini"
            Executed_from_exe       = True
        else:
            config_path = pathlib.Path(__file__).parent.absolute() / "config.ini"
            Executed_from_exe       = False

        # - If config.ini is not found raise an exeption
        if not os.path.exists(config_path):
            raise ConfigFileNotFoundException(
                """The config.ini file was not found, this file MUST be in the same folder as the "server_Camera" script or executable.""")

        # - Loading .ini file
        configur = ConfigParser() 
        configur.read(config_path)

        self.__VideoIterationNumber = VideoIterationNumber

        # - Retreive log variable, to enable logs or not.
        debug_cameraTool_str        = configur.get('debug_cameraTool','log_errors')
        if debug_cameraTool_str.lower()     == "true" :
            debug_cameraTool = True
        elif debug_cameraTool_str.lower()   == "false" :
            debug_cameraTool = False

        self.__VehicleProgram       = configur.get('initialize_cameraTool','VehicleProgram')

        self.__LogFolderPath        = configur.get('initialize_cameraTool','LogFolderPath')
        # - Create log directory if necessary
        os.makedirs( self.__LogFolderPath, exist_ok = True )

        # - Construct log's path
        self.__LogPath              = f"{self.__LogFolderPath}/PyCamera_{TimestampStr}_{self.__VehicleProgram}__Log[{self.__VideoIterationNumber}].txt"

        self.__ImageFolderPath      = configur.get('initialize_cameraTool','ImageFolderPath')
        # - Create image directory if necessary
        os.makedirs( self.__ImageFolderPath, exist_ok = True )

        self.__VideoFolderPath      = configur.get('initialize_cameraTool','VideoFolderPath')
        # - Create video directory if necessary
        os.makedirs( self.__VideoFolderPath, exist_ok = True )

        self.Logger = PyLogger(                                        
            Enabled     = debug_cameraTool,
            ConsoleIds  = [ 0, 1, 2, 3 ],
            LogFileIds  = [ 0, 1, 2, 3 ],
            LogFilePath = self.__LogPath ) 

        self.__Video0Name           = configur.get('initialize_cameraTool','Video0Name')
        self.__Video1Name           = configur.get('initialize_cameraTool','Video1Name')
        self.__Video2Name           = configur.get('initialize_cameraTool','Video2Name')
        self.__Video3Name           = configur.get('initialize_cameraTool','Video3Name')
        self.__SelectCam0           = int(configur.get('initialize_cameraTool','SelectCam0'))
        self.__SelectCam1           = int(configur.get('initialize_cameraTool','SelectCam1'))
        self.__SelectCam2           = int(configur.get('initialize_cameraTool','SelectCam2'))
        self.__SelectCam3           = int(configur.get('initialize_cameraTool','SelectCam3'))
        self.__Cam0Width            = int(configur.get('initialize_cameraTool','Cam0Width'))
        self.__Cam0Height           = int(configur.get('initialize_cameraTool','Cam0Height'))
        self.__Cam1Width            = int(configur.get('initialize_cameraTool','Cam1Width'))
        self.__Cam1Height           = int(configur.get('initialize_cameraTool','Cam1Height'))
        self.__Cam2Width            = int(configur.get('initialize_cameraTool','Cam2Width'))
        self.__Cam2Height           = int(configur.get('initialize_cameraTool','Cam2Height'))
        self.__Cam3Width            = int(configur.get('initialize_cameraTool','Cam3Width'))
        self.__Cam3Height           = int(configur.get('initialize_cameraTool','Cam3Height'))
        self.__SelectCamFocus0      = int(configur.get('initialize_cameraTool','SelectCamFocus0'))
        self.__SelectCamFocus1      = int(configur.get('initialize_cameraTool','SelectCamFocus1'))
        self.__SelectCamFocus2      = int(configur.get('initialize_cameraTool','SelectCamFocus2'))
        self.__SelectCamFocus3      = int(configur.get('initialize_cameraTool','SelectCamFocus3'))
        self.__Ip                   = configur.get('initialize_cameraTool','Ip')
        self.__Port                 = int(configur.get('initialize_cameraTool','Port'))

        self.Logger.Log0( f"Executed from exe:'{Executed_from_exe}'" )
        self.Logger.Log0( f"The path of the ini file is:'{config_path}'" )
        self.Logger.Log0( f"The ImageFolderPath argument is:'{self.__ImageFolderPath}'")
        self.Logger.Log0( f"The VideoFolderPath argument is:'{self.__VideoFolderPath}'")
        self.Logger.Log0( f"The LogFolderPath argument is:'{self.__LogFolderPath}'" )
        self.Logger.Log0( f"The VehicleProgram argument is:'{self.__VehicleProgram}'" )
        self.Logger.Log0( f"The Ip argument is:'{self.__Ip }'" )
        self.Logger.Log0( f"The Port argument is:'{self.__Port }'" )
        self.Logger.Log0( f"The SelectCam0 argument is:'{self.__SelectCam0 }'" )
        self.Logger.Log0( f"The SelectCam1 argument is:'{self.__SelectCam1 }'" )
        self.Logger.Log0( f"The SelectCam2 argument is:'{self.__SelectCam2 }'" )
        self.Logger.Log0( f"The SelectCam3 argument is:'{self.__SelectCam3 }'" )

        self.Logger.Log0( f"The Cam0Width argument is:'{self.__Cam0Width }'" )
        self.Logger.Log0( f"The Cam0Height argument is:'{self.__Cam0Height }'" )

        self.Logger.Log0( f"The Cam1Width argument is:'{self.__Cam1Width }'" )
        self.Logger.Log0( f"The Cam1Height argument is:'{self.__Cam1Height }'" )

        self.Logger.Log0( f"The Cam2Width argument is:'{self.__Cam2Width }'" )
        self.Logger.Log0( f"The Cam2Height argument is:'{self.__Cam2Height }'" )
        
        self.Logger.Log0( f"The Cam3Width argument is:'{self.__Cam3Width }'" )
        self.Logger.Log0( f"The Cam3Height argument is:'{self.__Cam3Height }'" )

        self.Stop                   = multiprocessing.Event()
        camera_trigger0_shared_bool = multiprocessing.Event()
        camera_trigger1_shared_bool = multiprocessing.Event()
        camera_trigger2_shared_bool = multiprocessing.Event()
        camera_trigger3_shared_bool = multiprocessing.Event()

        camera_queue_0              = multiprocessing.Queue( maxsize = 0 )
        camera_queue_1              = multiprocessing.Queue( maxsize = 0 )
        camera_queue_2              = multiprocessing.Queue( maxsize = 0 )
        camera_queue_3              = multiprocessing.Queue( maxsize = 0 )


        print(self.Stop.is_set())
        print(camera_trigger0_shared_bool.is_set())
        print(camera_trigger1_shared_bool.is_set())
        print(camera_trigger2_shared_bool.is_set())
        print(camera_trigger3_shared_bool.is_set())

        self.Cv2Threads = [
                    multiprocessing.Process(name="Cv2_Zero", target=Cv2SM, args=(self.Stop, self.__SelectCam0, self.__Cam0Width, self.__Cam0Height, self.__SelectCamFocus0, self.__Video0Name, "Cv2_Zero", self.__VideoFolderPath, self.__VideoIterationNumber, camera_queue_0, camera_trigger0_shared_bool)),
                    #multiprocessing.Process(name="Cv2_One", target=Cv2SM, args=(self.Stop, self.__SelectCam1, self.__Cam1Width, self.__Cam1Height, self.__SelectCamFocus1, self.__Video1Name, "Cv2_One", self.__VideoFolderPath, self.__VideoIterationNumber, camera_queue_1, camera_trigger1_shared_bool)),
                    #multiprocessing.Process(name="Cv2_Two", target=Cv2SM, args=(self.Stop, self.__SelectCam2, self.__Cam2Width, self.__Cam2Height, self.__SelectCamFocus2, self.__Video2Name, "Cv2_Two", self.__VideoFolderPath, self.__VideoIterationNumber, camera_queue_2, camera_trigger2_shared_bool)),
                    #multiprocessing.Process(name="Cv2_Three", target=Cv2SM, args=(self.Stop, self.__SelectCam3, self.__Cam3Width, self.__Cam3Height, self.__SelectCamFocus3, self.__Video3Name, "Cv2_Three", self.__VideoFolderPath, self.__VideoIterationNumber, camera_queue_3, camera_trigger3_shared_bool)),
                ]
        
        self.Cv2Dict = {
            "Zero" : camera_trigger0_shared_bool,
            "One"  : camera_trigger1_shared_bool,
            "Two"  : camera_trigger2_shared_bool,
            "Three": camera_trigger3_shared_bool,
            }
        
        self.Cv2Dict2 = {
            "Zero" : camera_queue_0,
            "One"  : camera_queue_1,
            "Two"  : camera_queue_2,
            "Three": camera_queue_3,
            }

        self.__SshServerThread = threading.Thread(name = "SSHServer_THREAD",          
                                        target = self.__ServerSM
                                        )

        
        print("")

        self.StartThreads()

    def StartThreads ( self ) :
        """
        """
        self.Logger.Log0( "PyCamera.StartThreads(): Will be start." )  
        self.__SshServerThread.start()  # Start the SSH server thread.
        for thread in self.Cv2Threads:  # Start all Cv2 processes.
            thread.start()
        self.Logger.Log0( "PyCamera.StartThreads(): All threads started." )  
        self.__SshServerThread.join() # Wait for the SSH server thread to finish.
        for thread in self.Cv2Threads: # Wait for all the Cv2 processes to finish.
            thread.join()
        self.Logger.Log0("PyCamera.StartThreads(): All threads finished.")          
        return
    
    
    def StopThreads ( self ) -> bool :
        """
        """
        self.Stop.set()
        Success   = True
        self.Logger.Log0( f"PyCamera.StopThreads() -> {Success}" )
        return Success
    
    def CameraRecordTrigger ( self       ,
                    selectCam       : str,
                    camRecording    : str,) -> bool :
        """
        """
        if camRecording == "True" :
            self.Cv2Dict[selectCam].set()
        elif camRecording == "False" :
            self.Cv2Dict[selectCam].clear()
        Success = True
        self.Logger.Log0( f"PyCamera.CameraRecordTrigger() {self.Cv2Dict[selectCam].is_set} -> {camRecording}" )
        return Success
    
    def TakePhotoArgs ( self                    ,
                        ImageName        : str  ,
                        Iteration        : str  ,
                        CamNumberName    : str  ,) -> bool :
        """
        ### Description
        - Takes a photo with the desired camera number.
        - It names the photo with the variables "ImageName" and "Iteration".
        """
        IterationStr        = f"{int(Iteration):0>3}"
        FullImageName       = f"{ImageName}_{datetime.datetime.now():%Y%m%d_%H%M%S}_[{IterationStr}].jpg"
        FullImagePath       = os.path.join(self.__ImageFolderPath , FullImageName)
        try:
            cv2.imwrite(FullImagePath, self.Cv2Dict2[CamNumberName].get(timeout=0.5))
        except:
            print("Empty Queue TakePhotoArgs")
        #self.Logger.Log0( f"PyCamera.TakePhotoArgs() -> {""}: {FullImageName} " )
        return FullImageName

    def __ServerSM ( self ) :
        """
        """
        SshServerInitialized = False
        TimeoutSecs = 1.0
        while True :
            if not self.Stop.is_set() :
                if not SshServerInitialized :
                    self.Logger.Log0( f"PyCamera.ServerSM: Initialize" )
                    # - Initialize ---------------------------------------------
                    try :
                        SshServerAddress = ( self.__Ip , self.__Port )
                        SshServer = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
                        SshServer.settimeout( TimeoutSecs) 
                        SshServer.bind( SshServerAddress )
                        SshServer.listen( 1 ) 
                        SshServerInitialized = True
                    except :
                        ErrorStr = traceback.format_exc()
                        self.Logger.Log0( f"PyCamera.ServerSM: TCP/IP Server Initialization Failed!\n\"\"\"\n{ErrorStr}\n\"\"\"" )
                        SshServer.close()
                    # ----------------------------------------------------------
                    self.Logger.Log0( f"PyCamera.ServerSM: Initialized" )
                else : # if SshInitialized :
                    self.Logger.Log1( f"PyCamera.ServerSM: Awaiting Command" )
                    # - Await Command ------------------------------------------
                    try :
                        SshClient, _ = SshServer.accept()
                        Command = SshClient.recv( 2_147_483_647 ).decode( "UTF-8" ) # 2^31
                        self.Logger.Log1( f"PyCamera.ServerSM: Process Command \"{Command}\"" )
                        # - Process Command ------------------------------------
                        CommandProcessed = False
                        # - "Stop()"
                        if Command == "Stop()" :
                            Success = self.StopThreads()
                            ReplyBytes = f"{Success!r}".encode( "UTF-8" )
                            SshClient.sendall( ReplyBytes )
                            CommandProcessed = True
                        #   "CameraRecordTrigger() Zero True"
                        elif Match := re.match(
                                pattern = "(?:CameraRecordTrigger\(\))" "\s(?P<selectCam>\w+)" "\s(?P<camRecording>\w+)",
                                string  = Command ) :
                            selectCam, camRecording,  = Match.groups()
                            Success = self.CameraRecordTrigger(
                                                        selectCam       = selectCam,
                                                        camRecording    = camRecording,)
                            ReplyBytes = f"{Success!r}".encode( "UTF-8" )
                            SshClient.sendall( ReplyBytes )
                            CommandProcessed = True
                        # - "TakePhotoArgs() TestImageName 2 Zero"
                        elif Match := re.match(
                                pattern = "(?:TakePhotoArgs\(\))" "\s(?P<ImageName>\w+)" "\s(?P<Iteration>\d{1,3})" "\s(?P<CamNumberName>\w+)",
                                string  = Command ) :
                            ImageName, Iteration, CamNumberName = Match.groups()
                            ImageName = self.TakePhotoArgs(
                                                        ImageName       = ImageName,
                                                        Iteration       = Iteration,
                                                        CamNumberName   = CamNumberName,)
                            ReplyBytes = f"{ImageName!r}".encode( "UTF-8" )
                            SshClient.sendall( ReplyBytes )
                            CommandProcessed = True
                        # - Feedback
                        if CommandProcessed :
                            self.Logger.Log1( f"PyCamera.ServerSM: Replied \"{ReplyBytes}\"" )
                        else :
                            SshClient.sendall( f"Unrecognized Command \"{Command}\"".encode( "UTF-8" ) )
                            self.Logger.Log1( f"PyCamera.ServerSM: Unrecognized Command \"{Command}\"" )
                        # ------------------------------------------------------
                    except socket.timeout : # SSH Timeout
                        self.Logger.Log2( f"PyCamera.ServerSM: Socket Timeout" )
                        pass
                    except : # SSH Timeout
                        ErrorStr = traceback.format_exc()
                        self.Logger.Log2( f"PyCamera.ServerSM: Unknown Error!\n\"\"\"\n{ErrorStr}\n\"\"\"" )
                        pass
                    # ----------------------------------------------------------
            else: # if self.Stop :
                if SshServerInitialized :
                    self.Logger.Log0( f"PyCamera.ServerSM: Terminate" )
                    # - Terminate ----------------------------------------------
                    SshServer.close()
                    SshServerInitialized = False
                    # ----------------------------------------------------------
                    self.Logger.Log0( f"PyCamera.ServerSM: Terminated" )
                break
        self.Logger.Log0( f"PyCamera.ServerSM: Thread Exit" )
        return

    class Tests :
        """
        ### Description

        """
        # = Constructor ========================================================
        def __init__(self,
                Parent         : "PyCamera"                       ,
                ) :
            #   Parameters
            self.__Parent         = Parent
            self.Logger           = self.__Parent.Logger
            return
        
        def Executable ( ) :
            """
            This function is intended to test this script and make it run automatically.
            Using .bat or making a .exe
            """
            # ;f=Varsity&t=CameraTool%0Av2.1.1
            print(r"""
               ______                                         _________               __   
             .' ___  |                                       |  _   _  |             [  |  
            / .'   \_| ,--.   _ .--..--.  .---.  _ .--.  ,--.|_/ | | \_|.--.    .--.  | |  
            | |       `'_\ : [ `.-. .-. |/ /__\\[ `/'`\]`'_\ :   | |  / .'`\ \/ .'`\ \| |  
            \ `.___.'\// | |, | | | | | || \__., | |    // | |, _| |_ | \__. || \__. || |  
             `.____ .'\'-;__/[___||__||__]'.__.'[___]   \'-;__/|_____| '.__.'  '.__.'[___] 
                                        
                                                                                           """)
            print("--->PLEASE DO NOT CHANGE THE NAME OF THIS EXECUTABLE")       
            if len(sys.argv) > 1:
                sys.argv.pop(0)
                VideoIterationNumber, = sys.argv

                print(f"The 1st argument :'{VideoIterationNumber}'")


            print("Outside argv")
            # - Startup --------------------------------------------------------
            PyLogger = PyCamera(VideoIterationNumber = VideoIterationNumber   ,)
            return

        def Standalone ( VehicleProgram: str ) :
            """
            This method, is intended to be used in VSCODE (or any IDE)
            """
            # ;f=Varsity&t=CameraTool%0Av2.1.1
            print(r"""
               ______                                         _________               __   
             .' ___  |                                       |  _   _  |             [  |  
            / .'   \_| ,--.   _ .--..--.  .---.  _ .--.  ,--.|_/ | | \_|.--.    .--.  | |  
            | |       `'_\ : [ `.-. .-. |/ /__\\[ `/'`\]`'_\ :   | |  / .'`\ \/ .'`\ \| |  
            \ `.___.'\// | |, | | | | | || \__., | |    // | |, _| |_ | \__. || \__. || |  
             `.____ .'\'-;__/[___||__||__]'.__.'[___]   \'-;__/|_____| '.__.'  '.__.'[___]                                       
                                                                                           """)
            print("--->PLEASE DO NOT CHANGE THE NAME OF THIS EXECUTABLE")
            PyLogger = PyCamera(VideoIterationNumber   = "666"                  ,)
            return    
    
def Cv2SM ( Stop,
    SelectCam,
    __Width,
    __Height,
    __SelectCamFocus,
    __VideoName,
    Cv2Name,
    __VideoFolderPath,
    __VideoIterationNumber,
    camera_queue,
    CameraTrigger,
    ) :
    """
    This state machines gets frames from the camera.
    """
    Cv2ScreensCount = 0
    Cv2Initialized = False 
    VideoInitialized = False
    VideoTimeInterval = []
    while True :
        if not Stop.is_set() :   
            if not Cv2Initialized :

                # - Initialize -----------------------------------------

                Capture = CV2functions.initializeCV2(#self,
                                                SelectCam         = SelectCam,
                                                Width             = __Width  ,
                                                Height            = __Height ,
                                                SelectCamFocus    = __SelectCamFocus,)
                # - Create video directory if necessary
                if   Cv2Name == "Cv2_Zero" :
                    TemporaryVideoScreens = f"{__VideoFolderPath}/Frames_[{__VideoIterationNumber}]"
                elif Cv2Name == "Cv2_One"  :
                    TemporaryVideoScreens = f"{__VideoFolderPath}/Frames1_[{__VideoIterationNumber}]"                        
                elif Cv2Name == "Cv2_Two"  :
                    TemporaryVideoScreens = f"{__VideoFolderPath}/Frames2_[{__VideoIterationNumber}]"
                elif Cv2Name == "Cv2_Three"  :
                    TemporaryVideoScreens = f"{__VideoFolderPath}/Frames3_[{__VideoIterationNumber}]"
                os.makedirs( TemporaryVideoScreens , exist_ok = True )
                # - Calculate fps's ------------------------------------
                
                fps = CV2functions.calculateFPS(
                                        captureObject   = Capture  ,
                                        framesToCapture = 30            ,)
            
                Cv2Initialized  = True


                #self.Logger.Log0( f"{self.Cv2Name}.__Cv2SM.Initialized" )
            else : # if Cv2Initialized :
                Success, image_array, = CV2functions.takeFrame(
                    videoCaptureObject = Capture,
                )
                camera_queue.put(image_array)

                if  CameraTrigger.is_set() and Success :
                    if not VideoInitialized:
                        # - Initialize Video------------------------------------
                        fourcc = CV2functions.initializeVideoWriter(
                            codec = 'DIVX',
                        )
                        print(f"{Cv2Name}.# FPS :{fps}")
                        # - Video name to comply with flickering requirements 
                        if   Cv2Name == "Cv2_Zero" :
                            filename = f"{__VideoFolderPath}/{__VideoName}_[{__VideoIterationNumber}].avi"

                        elif Cv2Name == "Cv2_One"  :
                            filename = f"{__VideoFolderPath}/{__VideoName}1_[{__VideoIterationNumber}].avi"

                        elif Cv2Name == "Cv2_Two"  :
                            filename = f"{__VideoFolderPath}/{__VideoName}2_[{__VideoIterationNumber}].avi"

                        elif Cv2Name == "Cv2_Three"  :
                            filename = f"{__VideoFolderPath}/{__VideoName}3_[{__VideoIterationNumber}].avi"
                        out = CV2functions.videoWriter(
                            filename    = filename,
                            fourcc      = fourcc       ,
                            fps         = fps          ,
                            Width       = __Width ,
                            Height      = __Height,
                        )
                        VideoInitialized = True
                        
                    Cv2ScreensCount = Cv2ScreensCount + 1
                    
                    VideoTimeInterval.append(time.time())
                    CV2functions.saveVideo(
                                        videoWriterObject = out,
                                        frame = image_array               ,
                                        )
                try:
                    _ = camera_queue.get(block=False)
                except:
                    print("Empty Queue inside CV2")
                
        else : # if self.Parent.Stop :
            if Cv2Initialized :
                Capture.release()
                #self.Logger.Log0( f"{self.Cv2Name}.__Cv2SM.Terminate" )
                Cv2Initialized = False
                if VideoTimeInterval : # If empty list don't create json
                    ReportDict = Reports.dictionaries.JsonVideoTimeIntervals(
                        Starting_time     = VideoTimeInterval[0],
                        VideoTimeInterval = VideoTimeInterval   , )
                    # - Provisional Videolog name to comply with flickering requirements 
                    if Cv2Name == "Cv2_Zero" :
                        JsonPath = f"{__VideoFolderPath}/Videolog_[{__VideoIterationNumber}].json"
                    if Cv2Name == "Cv2_One" :
                        JsonPath = f"{__VideoFolderPath}/Videolog1_[{__VideoIterationNumber}].json"
                    if Cv2Name == "Cv2_Two" :
                        JsonPath = f"{__VideoFolderPath}/Videolog2_[{__VideoIterationNumber}].json"
                    if Cv2Name == "Cv2_Three" :
                        JsonPath = f"{__VideoFolderPath}/Videolog3_[{__VideoIterationNumber}].json"
                    Misc.Json.Write(
                        Object   = ReportDict,
                        FilePath = JsonPath  , )
                try:
                    _ = camera_queue.get(block=False)
                except:
                    print("Empty Queue inside CV2 when stop flag is activated")
            # Then break -----------------------------------------------
            break
    print(f"Is the queue from {Cv2Name} empy:{camera_queue.empty()}")
    print(f"How many items has the queue from {Cv2Name}, it has:{camera_queue.qsize()}")

    #self.Logger.Log0( f"{self.Cv2Name}.__Cv2SM.ThreadExit" )
    return 

if __name__ == "__main__" :
    Tests = [ 2 ]
    if 1 in Tests : PyCamera.Tests.Executable( )
    if 2 in Tests : PyCamera.Tests.Standalone( VehicleProgram = "TestStandalone")

本文标签: