admin管理员组文章数量:1316023
I'm implementing an Azure queuing mechanism to trigger multiple pipleines at the same time. I have two function apps, functiona_app1, functiona_app2. In each of these function apps, I have two pipelines created one for current customer and one for new customer. My requirement is that one pipeline should be triggered in each function app at the same time irrespective of the customer type. Other pipelines triggered should be in queue till any of the function app becomes available.
Suppose, If I trigger current customer, it should execute any of the current customer pipeline in any function app. If I trigger another current customer, it should execute the other pipeline in the other function app. And if I trigger new customer, since both the function apps are in use it should wait in the queue.
This is my PipelineManager code:
class PipelineManager:
def __init__(self):
self.data_factory_client = DataFactoryManagementClient(
credential=DefaultAzureCredential(), subscription_id=SUBSCRIPTION_ID
)
# Reanize pipeline configurations by function app
self.pipeline_configs = {
"function_app1": {
"max_concurrent": 1, # Only one pipeline per function app
"pipelines": {
"new_customer": {
"name": "new customer report with validation",
"priority": 1,
},
"current_customer": {
"name": "final current customer optimization",
"priority": 1,
},
}
},
"function_app2": {
"max_concurrent": 1, # Only one pipeline per function app
"pipelines": {
"new_customer": {
"name": "new customer pipeline 02",
"priority": 1,
},
"current_customer": {
"name": "current customer pipeline 02",
"priority": 1,
},
}
}
}
# Track running pipelines with timestamps and status
self.running_pipelines = {}
# Track pipeline usage history for load balancing
self.pipeline_usage = {} # {pipeline_name: last_used_timestamp}
# Track function app usage
self.function_app_running = {} # {function_app: {run_id, start_time}}
def _cleanup_running_pipelines(self):
"""Clean up completed or old pipeline entries and function app tracking"""
current_time = datetime.now()
to_remove = []
for run_id, info in self.running_pipelines.items():
if info["status"] in ["Succeeded", "Failed", "Cancelled"]:
if current_time - info["start_time"] > timedelta(hours=1):
to_remove.append(run_id)
# Clean up function app tracking
for func_app, running_info in self.function_app_running.items():
if running_info.get("run_id") == run_id:
del self.function_app_running[func_app]
elif current_time - info["start_time"] > timedelta(hours=24): # Safety cleanup
to_remove.append(run_id)
for run_id in to_remove:
del self.running_pipelines[run_id]
def _get_function_app_availability(self):
"""Check which function apps are available"""
self._cleanup_running_pipelines()
available_apps = {}
for func_app in self.pipeline_configs.keys():
is_available = func_app not in self.function_app_running or \
self.function_app_running[func_app]["run_id"] not in self.running_pipelines
available_apps[func_app] = is_available
return available_apps
def _calculate_pipeline_score(self, func_app, pipeline_name, priority):
"""Calculate a score for pipeline selection based on multiple factors"""
current_time = datetime.now()
# Time since last use score (0-1, higher is better)
last_used = self.pipeline_usage.get(pipeline_name, current_time - timedelta(hours=24))
time_since_use = (current_time - last_used).total_seconds()
time_score = min(time_since_use / 3600, 1.0) # Cap at 1 hour
# Priority score (0-1, lower is better)
priority_score = priority / 10 # Assuming max priority is 10
# Function app usage score (0-1, higher is better)
app_last_used = self.function_app_running.get(func_app, {}).get("start_time", current_time - timedelta(hours=24))
app_time_score = min((current_time - app_last_used).total_seconds() / 3600, 1.0)
# Add small random factor for tie-breaking (0-0.1)
randomization = random.uniform(0, 0.1)
# Combine scores (lower is better)
final_score = (
(1 - time_score) * 0.3 + # 30% weight to time since last use
priority_score * 0.2 + # 20% weight to priority
(1 - app_time_score) * 0.4 + # 40% weight to function app availability
randomization * 0.1 # 10% weight to randomization
)
return final_score
def get_available_pipeline(self, analysis_type):
"""Choose the most suitable pipeline based on function app availability"""
logger.info(f"\nSelecting pipeline for {analysis_type}")
# Get function app availability
available_apps = self._get_function_app_availability()
logger.info("Function app availability:")
for app, available in available_apps.items():
logger.info(f" {app}: {'Available' if available else 'In Use'}")
available_pipelines = []
for func_app, config in self.pipeline_configs.items():
if not available_apps[func_app]:
continue
if analysis_type in config["pipelines"]:
pipeline_config = config["pipelines"][analysis_type]
score = self._calculate_pipeline_score(
func_app,
pipeline_config["name"],
pipeline_config["priority"]
)
available_pipelines.append({
"name": pipeline_config["name"],
"score": score,
"function_app": func_app
})
if not available_pipelines:
logger.warning("No available pipelines found!")
return None
# Sort by score (lower is better)
available_pipelines.sort(key=lambda x: x["score"])
selected = available_pipelines[0]
logger.info(f"\nSelected pipeline: {selected['name']}")
logger.info(f" Function App: {selected['function_app']}")
logger.info(f" Score: {selected['score']:.3f}")
# Update usage timestamps
self.pipeline_usage[selected['name']] = datetime.now()
return selected["name"]
def get_pipeline_status(self, run_id):
"""Get the status of a pipeline run"""
try:
run_response = self.data_factory_client.pipeline_runs.get(
resource_group_name=RESOURCE_GROUP_NAME,
factory_name=DATA_FACTORY_NAME,
run_id=run_id,
)
status = run_response.status
# Update status in our tracking
if run_id in self.running_pipelines:
self.running_pipelines[run_id]["status"] = status
# If pipeline is complete, clean up function app tracking
if status in ["Succeeded", "Failed", "Cancelled"]:
for func_app, running_info in self.function_app_running.items():
if running_info.get("run_id") == run_id:
del self.function_app_running[func_app]
return status
except Exception as e:
logger.error(f"Failed to get pipeline status: {e}")
if run_id in self.running_pipelines:
# Clean up tracking on error
func_app = self.running_pipelines[run_id].get("function_app")
if func_app and func_app in self.function_app_running:
del self.function_app_running[func_app]
del self.running_pipelines[run_id]
return "Failed"
def run_pipeline(self, analysis_type, parameters):
try:
# Log current pipeline loads and metrics
logger.info("\nStarting pipeline run")
logger.info("Current pipeline metrics:")
metrics = self.get_pipeline_metrics()
logger.info(json.dumps(metrics, indent=2))
pipeline_name = self.get_available_pipeline(analysis_type)
if not pipeline_name:
raise ValueError(f"No available pipeline for analysis type: {analysis_type}")
# Find which function app this pipeline belongs to
function_app = None
for app, config in self.pipeline_configs.items():
if any(p["name"] == pipeline_name for p in config["pipelines"].values()):
function_app = app
break
run_response = self.data_factory_client.pipelines.create_run(
resource_group_name=RESOURCE_GROUP_NAME,
factory_name=DATA_FACTORY_NAME,
pipeline_name=pipeline_name,
parameters=parameters,
)
# Track the running pipeline
self.running_pipelines[run_response.run_id] = {
"pipeline_name": pipeline_name,
"start_time": datetime.now(),
"status": "InProgress",
"function_app": function_app
}
# Track function app usage
self.function_app_running[function_app] = {
"run_id": run_response.run_id,
"start_time": datetime.now()
}
logger.info(f"Started pipeline '{pipeline_name}' on {function_app} with run_id: {run_response.run_id}")
return run_response.run_id
except Exception as e:
logger.error(f"Failed to start pipeline: {e}")
return None
def get_pipeline_metrics(self):
"""Get current metrics for all function apps and pipelines"""
self._cleanup_running_pipelines()
metrics = {}
for func_app, config in self.pipeline_configs.items():
metrics[func_app] = {
"is_available": func_app not in self.function_app_running,
"current_run": None,
"pipelines": {}
}
if func_app in self.function_app_running:
run_id = self.function_app_running[func_app]["run_id"]
if run_id in self.running_pipelines:
metrics[func_app]["current_run"] = {
"pipeline": self.running_pipelines[run_id]["pipeline_name"],
"run_id": run_id,
"start_time": self.running_pipelines[run_id]["start_time"].isoformat(),
"status": self.running_pipelines[run_id]["status"]
}
for analysis_type, pipeline_config in config["pipelines"].items():
pipeline_name = pipeline_config["name"]
last_used = self.pipeline_usage.get(pipeline_name)
metrics[func_app]["pipelines"][analysis_type] = {
"name": pipeline_name,
"last_used": last_used.isoformat() if last_used else None
}
return metrics
I'm using a scoring mechanism to try if based on the availability of the pipeline, the pipeline will be executed. But I'm facing an issue where the pipelines from same function app are running at the same time. Not sure where the issue is. Please help!
版权声明:本文标题:python - Issue in running pipelines associated with multiple function apps in the same ADF - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1741984066a2408568.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论