admin管理员组

文章数量:1398757

I'm trying to run a dataflow job using flex template in docker. Here what I have:


FROM python:3.11-slim

COPY --from=apache/beam_python3.11_sdk:2.54.0 /opt/apache/beam /opt/apache/beam

COPY --from=gcr.io/dataflow-templates-base/python311-template-launcher-base:20230622_RC00 /opt/google/dataflow/python_template_launcher /opt/google/dataflow/python_template_launcher

# Install necessary dependencies for Java
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
    wget \
    ca-certificates \
    openjdk-17-jdk && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME environment variable (optional, but recommended)
ENV JAVA_HOME /usr/lib/jvm/java-17-openjdk-amd64
ENV PATH $JAVA_HOME/bin:$PATH

# Verify Java installation (optional, but good practice)
RUN java -version

# Location to store the pipeline artifacts.
ARG WORKDIR=/template
WORKDIR ${WORKDIR}

COPY main.py .
COPY requirements.txt .

# Installing exhaustive list of dependencies from a requirements.txt
# helps to ensure that every time Docker container image is built,
# the Python dependencies stay the same. Using `--no-cache-dir` reduces image size.
RUN pip install --no-cache-dir -r requirements.txt

# Installing the pipeline package makes all modules encompassing the pipeline
# available via import statements and installs necessary dependencies.
# Editable installation allows picking up later changes to the pipeline code
# for example during local experimentation within the container.
RUN pip install -e .

# For more informaiton, see: 
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"

# Because this image will be used as custom sdk container image, and it already
# installs the dependencies from the requirements.txt, we can omit
# the FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE directive here
# to reduce pipeline submission time.
# Similarly, since we already installed the pipeline package,
# we don't have to specify the FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py" configuration option.

# Optionally, verify that dependencies are not conflicting.
# A conflict may or may not be significant for your pipeline.
RUN pip check

# Optionally, list all installed dependencies.
# The output can be used to seed requirements.txt for reproducible builds.
RUN pip freeze

# Set the entrypoint to Apache Beam SDK launcher, which allows this image
# to be used as an SDK container image.
ENTRYPOINT ["/opt/apache/beam/boot"]

Below my apache beam code:


import apache_beam as beam
import logging
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
import argparse
from typing import List, Dict, Tuple, Union
from apache_beam.io.jdbc import ReadFromJdbc
import logging
import os



def run(argv=None):
    
    """Runs the Apache Beam pipeline to read a CSV file from GCS and write it to BigQuery with specified schema, adding a timestamp column.

    Args:
        input_file_path: The GCS path to the CSV file (e.g., gs://your-bucket/path/to/data.csv).
        project_id: The ID of the Google Cloud project.
        dataset: The ID of the BigQuery dataset.
        table: The ID of the BigQuery table.
        network: The VPC network to use.
        subnetwork: The subnetwork to use.
        table_schema: The JSON string containing the BigQuery schema definition.
    """
    try:
        
        parser = argparse.ArgumentParser(description="Dataflow Flex Template")
        
        parser.add_argument("--project", required=True, help="GCP project.")
        parser.add_argument("--region", required=True, help="GCP region.")
        parser.add_argument("--staging_location", required=True, help="GCS staging location.")
        parser.add_argument("--temp_location", required=True, help="GCS temp location.")
        parser.add_argument("--network", help="VPC Network Name")
        parser.add_argument("--subnetwork", help="Subnetwork URL")
        parser.add_argument("--service_account_email", help="Service account email")
        parser.add_argument("--runner", default="DataflowRunner", help="Dataflow Runner")

        # Changed this to parse_known_args
        args, pipeline_args = parser.parse_known_args(argv)

        # Print argument values for debugging
        print(f"Project: {args.project}")
        print(f"Region: {args.region}")
        print(f"Staging Location: {args.staging_location}")
        print(f"Temp Location: {args.temp_location}")
        print(f"Network: {argswork}")
        print(f"Subnetwork: {args.subnetwork}")
        print(f"Service Account Email: {args.service_account_email}")
        
        
        pipeline_options = PipelineOptions(
            network=argswork,
            subnetwork=args.subnetwork,
            save_main_session=True,
            use_public_ips=False,
        )
        google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
        google_cloud_options.project = args.project
        google_cloud_options.region = args.region
        google_cloud_options.staging_location = args.staging_location
        google_cloud_options.temp_location = args.temp_location
        google_cloud_options.service_account_email = args.service_account_email

        standard_options = pipeline_options.view_as(StandardOptions)
        standard_options.runner = 'DataflowRunner'
        
        db_user = "sqlserver" #Retrieve from environment variable
        db_password = "root"#Retrieve from environment variable
        
        jdbc_url = "jdbc:sqlserver://XXX:XXX;databaseName=atra;trustServerCertificate=true;encrypt=false;"
        query = "SELECT TOP 10 * FROM production.brands"
       

        with beam.Pipeline(options=pipeline_options) as pipeline:
            data = pipeline | "Read from SQL Server" >> ReadFromJdbc(
                table_name='brands',
                driver_class_name='com.microsoft.sqlserver.jdbc.SQLServerDriver',
                jdbc_url=jdbc_url,
                username=db_user,
                password=db_password,
                query=query,
                fetch_size=1000,  # Adjust fetch size as needed
                classpath=["gs://XXX/jars/mssql-jdbc-12.8.1.jre11.jar"]
            )
            
            

            data | "Print Data" >> beam.Map(print)


    except Exception as e:
        logging.error(f"Error: {e}")


if __name__ == '__main__':
    import logging
    logging.getLogger().setLevel(logging.INFO)
    run()

below is how I build the image to use in Artifact Registry:

 
gcloud auth configure-docker us-east1-docker.pkg.dev

export PROJECT_ID=$(gcloud config get-value project)
export TEMPLATE_PATH="gs://XXXXX/templates/sqlserver-to-bq.json"




 gcloud dataflow flex-template build $TEMPLATE_PATH \
 --image-gcr-path "us-east1-docker.pkg.dev/$PROJECT_ID/dataflow-abc-repo/sql-server-to-bq-2:latest" \
 --sdk-language "PYTHON" \
 --flex-template-base-image "PYTHON3" \
 --py-path "." \
 --worker-region us-east1 \
 --service-account-email [email protected] \
 --env "FLEX_TEMPLATE_PYTHON_PY_FILE=main.py" \
 --env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" 

But somehow I keep getting the error message below from dataflow :

ERROR:root:Error: Cannot start an expansion service since neither Java nor Docker executables are available in the system.

I Have tried other images and installed Java, but the error is always the same!

The apache beam code works just fine WHEN RUNNING IT IN CLOUD SHELL where Java is installed. Google Cloud Network/subnetworking also working just fine from Dataflow workers, I ran a different code on dataflow to check if could reach the sqlserver instance and all good!

Could someone pls point me to the right direction or just share a Dockerfile and the gcloud command to build it correctly? I spent a few days already on this an no way of progress!

I Have tried other images and installed Java, but the error is always the same!

The apache beam code works just fine WHEN RUNNING IT IN CLOUD SHELL where Java is installed. Google Cloud Network/subnetworking also working just fine from Dataflow workers, I ran a different code on dataflow to check if could reach the sqlserver instance and all good!

Could someone pls point me to the right direction or just share a Dockerfile and the gcloud command to build it correctly? I spent a few days already on this an no way of progress!

本文标签: