admin管理员组

文章数量:1316023

I'm implementing an Azure queuing mechanism to trigger multiple pipleines at the same time. I have two function apps, functiona_app1, functiona_app2. In each of these function apps, I have two pipelines created one for current customer and one for new customer. My requirement is that one pipeline should be triggered in each function app at the same time irrespective of the customer type. Other pipelines triggered should be in queue till any of the function app becomes available.

Suppose, If I trigger current customer, it should execute any of the current customer pipeline in any function app. If I trigger another current customer, it should execute the other pipeline in the other function app. And if I trigger new customer, since both the function apps are in use it should wait in the queue.

This is my PipelineManager code:

class PipelineManager:
    def __init__(self):
        self.data_factory_client = DataFactoryManagementClient(
            credential=DefaultAzureCredential(), subscription_id=SUBSCRIPTION_ID
        )
        # Reanize pipeline configurations by function app
        self.pipeline_configs = {
            "function_app1": {
                "max_concurrent": 1,  # Only one pipeline per function app
                "pipelines": {
                    "new_customer": {
                        "name": "new customer report with validation",
                        "priority": 1,
                    },
                    "current_customer": {
                        "name": "final current customer optimization",
                        "priority": 1,
                    },
                }
            },
            "function_app2": {
                "max_concurrent": 1,  # Only one pipeline per function app
                "pipelines": {
                    "new_customer": {
                        "name": "new customer pipeline 02",
                        "priority": 1,
                    },
                    "current_customer": {
                        "name": "current customer pipeline 02",
                        "priority": 1,
                    },
                }
            }
        }
        # Track running pipelines with timestamps and status
        self.running_pipelines = {}
        # Track pipeline usage history for load balancing
        self.pipeline_usage = {}  # {pipeline_name: last_used_timestamp}
        # Track function app usage
        self.function_app_running = {}  # {function_app: {run_id, start_time}}

    def _cleanup_running_pipelines(self):
        """Clean up completed or old pipeline entries and function app tracking"""
        current_time = datetime.now()
        to_remove = []
        
        for run_id, info in self.running_pipelines.items():
            if info["status"] in ["Succeeded", "Failed", "Cancelled"]:
                if current_time - info["start_time"] > timedelta(hours=1):
                    to_remove.append(run_id)
                    # Clean up function app tracking
                    for func_app, running_info in self.function_app_running.items():
                        if running_info.get("run_id") == run_id:
                            del self.function_app_running[func_app]
            elif current_time - info["start_time"] > timedelta(hours=24):  # Safety cleanup
                to_remove.append(run_id)
                
        for run_id in to_remove:
            del self.running_pipelines[run_id]

    def _get_function_app_availability(self):
        """Check which function apps are available"""
        self._cleanup_running_pipelines()
        available_apps = {}
        
        for func_app in self.pipeline_configs.keys():
            is_available = func_app not in self.function_app_running or \
                         self.function_app_running[func_app]["run_id"] not in self.running_pipelines
            available_apps[func_app] = is_available
            
        return available_apps

    def _calculate_pipeline_score(self, func_app, pipeline_name, priority):
        """Calculate a score for pipeline selection based on multiple factors"""
        current_time = datetime.now()
        
        # Time since last use score (0-1, higher is better)
        last_used = self.pipeline_usage.get(pipeline_name, current_time - timedelta(hours=24))
        time_since_use = (current_time - last_used).total_seconds()
        time_score = min(time_since_use / 3600, 1.0)  # Cap at 1 hour
        
        # Priority score (0-1, lower is better)
        priority_score = priority / 10  # Assuming max priority is 10
        
        # Function app usage score (0-1, higher is better)
        app_last_used = self.function_app_running.get(func_app, {}).get("start_time", current_time - timedelta(hours=24))
        app_time_score = min((current_time - app_last_used).total_seconds() / 3600, 1.0)
        
        # Add small random factor for tie-breaking (0-0.1)
        randomization = random.uniform(0, 0.1)
        
        # Combine scores (lower is better)
        final_score = (
            (1 - time_score) * 0.3 +     # 30% weight to time since last use
            priority_score * 0.2 +        # 20% weight to priority
            (1 - app_time_score) * 0.4 +  # 40% weight to function app availability
            randomization * 0.1           # 10% weight to randomization
        )
        
        return final_score

    def get_available_pipeline(self, analysis_type):
        """Choose the most suitable pipeline based on function app availability"""
        logger.info(f"\nSelecting pipeline for {analysis_type}")
        
        # Get function app availability
        available_apps = self._get_function_app_availability()
        logger.info("Function app availability:")
        for app, available in available_apps.items():
            logger.info(f"  {app}: {'Available' if available else 'In Use'}")
        
        available_pipelines = []
        for func_app, config in self.pipeline_configs.items():
            if not available_apps[func_app]:
                continue
                
            if analysis_type in config["pipelines"]:
                pipeline_config = config["pipelines"][analysis_type]
                score = self._calculate_pipeline_score(
                    func_app,
                    pipeline_config["name"],
                    pipeline_config["priority"]
                )
                
                available_pipelines.append({
                    "name": pipeline_config["name"],
                    "score": score,
                    "function_app": func_app
                })
        
        if not available_pipelines:
            logger.warning("No available pipelines found!")
            return None
        
        # Sort by score (lower is better)
        available_pipelines.sort(key=lambda x: x["score"])
        selected = available_pipelines[0]
        
        logger.info(f"\nSelected pipeline: {selected['name']}")
        logger.info(f"  Function App: {selected['function_app']}")
        logger.info(f"  Score: {selected['score']:.3f}")
        
        # Update usage timestamps
        self.pipeline_usage[selected['name']] = datetime.now()
        
        return selected["name"]

    def get_pipeline_status(self, run_id):
        """Get the status of a pipeline run"""
        try:
            run_response = self.data_factory_client.pipeline_runs.get(
                resource_group_name=RESOURCE_GROUP_NAME,
                factory_name=DATA_FACTORY_NAME,
                run_id=run_id,
            )
            status = run_response.status

            # Update status in our tracking
            if run_id in self.running_pipelines:
                self.running_pipelines[run_id]["status"] = status
                
                # If pipeline is complete, clean up function app tracking
                if status in ["Succeeded", "Failed", "Cancelled"]:
                    for func_app, running_info in self.function_app_running.items():
                        if running_info.get("run_id") == run_id:
                            del self.function_app_running[func_app]

            return status
            
        except Exception as e:
            logger.error(f"Failed to get pipeline status: {e}")
            if run_id in self.running_pipelines:
                # Clean up tracking on error
                func_app = self.running_pipelines[run_id].get("function_app")
                if func_app and func_app in self.function_app_running:
                    del self.function_app_running[func_app]
                del self.running_pipelines[run_id]
            return "Failed"

    def run_pipeline(self, analysis_type, parameters):
        try:
            # Log current pipeline loads and metrics
            logger.info("\nStarting pipeline run")
            logger.info("Current pipeline metrics:")
            metrics = self.get_pipeline_metrics()
            logger.info(json.dumps(metrics, indent=2))

            pipeline_name = self.get_available_pipeline(analysis_type)
            if not pipeline_name:
                raise ValueError(f"No available pipeline for analysis type: {analysis_type}")

            # Find which function app this pipeline belongs to
            function_app = None
            for app, config in self.pipeline_configs.items():
                if any(p["name"] == pipeline_name for p in config["pipelines"].values()):
                    function_app = app
                    break

            run_response = self.data_factory_client.pipelines.create_run(
                resource_group_name=RESOURCE_GROUP_NAME,
                factory_name=DATA_FACTORY_NAME,
                pipeline_name=pipeline_name,
                parameters=parameters,
            )

            # Track the running pipeline
            self.running_pipelines[run_response.run_id] = {
                "pipeline_name": pipeline_name,
                "start_time": datetime.now(),
                "status": "InProgress",
                "function_app": function_app
            }

            # Track function app usage
            self.function_app_running[function_app] = {
                "run_id": run_response.run_id,
                "start_time": datetime.now()
            }

            logger.info(f"Started pipeline '{pipeline_name}' on {function_app} with run_id: {run_response.run_id}")
            return run_response.run_id

        except Exception as e:
            logger.error(f"Failed to start pipeline: {e}")
            return None

    def get_pipeline_metrics(self):
        """Get current metrics for all function apps and pipelines"""
        self._cleanup_running_pipelines()
        metrics = {}
        
        for func_app, config in self.pipeline_configs.items():
            metrics[func_app] = {
                "is_available": func_app not in self.function_app_running,
                "current_run": None,
                "pipelines": {}
            }
            
            if func_app in self.function_app_running:
                run_id = self.function_app_running[func_app]["run_id"]
                if run_id in self.running_pipelines:
                    metrics[func_app]["current_run"] = {
                        "pipeline": self.running_pipelines[run_id]["pipeline_name"],
                        "run_id": run_id,
                        "start_time": self.running_pipelines[run_id]["start_time"].isoformat(),
                        "status": self.running_pipelines[run_id]["status"]
                    }
            
            for analysis_type, pipeline_config in config["pipelines"].items():
                pipeline_name = pipeline_config["name"]
                last_used = self.pipeline_usage.get(pipeline_name)
                
                metrics[func_app]["pipelines"][analysis_type] = {
                    "name": pipeline_name,
                    "last_used": last_used.isoformat() if last_used else None
                }
        
        return metrics

I'm using a scoring mechanism to try if based on the availability of the pipeline, the pipeline will be executed. But I'm facing an issue where the pipelines from same function app are running at the same time. Not sure where the issue is. Please help!

本文标签: pythonIssue in running pipelines associated with multiple function apps in the same ADFStack Overflow