use_this_to_generate_the_yaml.ipynb•21.6 kB
{
"cells": [
{
"cell_type": "markdown",
"id": "f2e271e9-fd57-4c3f-8f35-6554af9dce05",
"metadata": {
"tags": []
},
"source": [
"# Dependencies\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "151fe390-f865-4c5a-b7f2-94672000adbf",
"metadata": {},
"outputs": [],
"source": [
"pip install PyYAML"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f725ba31-5b51-4701-964f-5e17e254fa95",
"metadata": {},
"outputs": [],
"source": [
"!sudo apt-get install build-essential"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "db751d96-a48e-4bdc-8d82-eace3b136520",
"metadata": {},
"outputs": [],
"source": [
"# updated to the latest dependencies on February 2023\n",
"!pip install google-cloud-aiplatform==1.21.0 --upgrade\n",
"!pip install google-cloud-pipeline-components==1.0.27 --upgrade\n",
"!pip install kfp --upgrade\n",
"!pip install google-auth google-auth-oauthlib google-auth-httplib2"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1421d04d-dfd3-446e-b4c2-6d39914d6a0b",
"metadata": {},
"outputs": [],
"source": [
"import kfp\n",
"from kfp import dsl\n",
"from typing import NamedTuple\n",
"from kfp.dsl import pipeline\n",
"from kfp.dsl import component\n",
"from kfp.dsl import OutputPath\n",
"from kfp.dsl import InputPath\n",
"from kfp.dsl import Output\n",
"from kfp.dsl import Metrics\n",
"from kfp import compiler\n",
"#from kfp.v2.google import client as kfp_client\n",
"# from google.cloud.aiplatform import pipeline_jobs\n",
"# from google_cloud_pipeline_components import aiplatform as gcc_aip\n",
"# from google_cloud_pipeline_components.aiplatform import ModelUploadOp\n"
]
},
{
"cell_type": "markdown",
"id": "47dd6814-4122-4f1e-a4d6-d5da8a8b644e",
"metadata": {
"tags": []
},
"source": [
"# Authentication"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "74954ce1-26c1-4f42-96e9-13d655b2d82e",
"metadata": {},
"outputs": [],
"source": [
"# Cloud project id.\n",
"PROJECT_ID = \"YOUR_PROJECT_ID\" # @param {type:\"string\"}\n",
"\n",
"# The region you want to launch jobs in.\n",
"REGION = \"YOUR_REGION # @param {type:\"string\"}\n",
"\n",
"SERVICE_ACCOUNT = \"YOUR_SERVICE_ACCOUNT_EMAIL\" # @param {type:\"string\"}\n",
"\n",
"PIPELINE_ROOT = \"YOUR_GCS_BUCKET_PATH_FOR_PIPELINE\"\n",
"\n",
"STAGING_BUCKET = \"YOUR_GCS_STAGING_BUCKET\"\n",
"\n",
"!set GOOGLE_APPLICATION_CREDENTIALS='YOUR_SERVICE_ACCOUNT_KEY_PATH' #DO NOT UPLOAD THIS TO GITHUB REPO"
]
},
{
"cell_type": "markdown",
"id": "20582dae-0d33-46a7-ad47-87304e93fb08",
"metadata": {
"tags": []
},
"source": [
"# Clients"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "723a58e0-1f8d-48f0-810f-58b791bcc7e0",
"metadata": {},
"outputs": [],
"source": [
"from google.cloud import aiplatform\n",
"aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=\"YOUR_STAGING_BUCKET\")"
]
},
{
"cell_type": "markdown",
"id": "58cd61c5-c58f-4e40-a9bc-f9dcb4106e54",
"metadata": {
"tags": []
},
"source": [
"# Pipeline Basics"
]
},
{
"cell_type": "markdown",
"id": "41ad2785-742e-4540-958e-d3ad6e087ce6",
"metadata": {},
"source": [
"## Component-1 : The Training Job"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "395da85a-80c0-44ce-a0c2-0c13d4ec1f3b",
"metadata": {},
"outputs": [],
"source": [
"@dsl.component(base_image='YOUR_CUSTOM_CONTAINER_ARTIFACT_REGISTRY_ENDPOINT') #you must have created a new container and pushed it to the Artifact repo while running the cloud build steps\n",
"def stable_diffusion_training_job_op(\n",
" gcs_class_data_path: str, \n",
" gcs_instance_data_path: str,\n",
" jobId: str,\n",
" jobName: str,\n",
" model_id: str,\n",
" num_nodes: int = 1,\n",
" # machine_type: str = \"a2-highgpu-1g\",\n",
" # gpu_type: str = \"NVIDIA_TESLA_A100\",\n",
" machine_type: str = \"n1-highmem-8\",\n",
" gpu_type: str = \"NVIDIA_TESLA_T4\",\n",
" num_gpus: int = 1\n",
") -> str:\n",
"\n",
" # Import statements\n",
" from google.cloud import storage\n",
" import os\n",
" import base64\n",
" import glob\n",
" from datetime import datetime\n",
" from io import BytesIO\n",
" from typing import NamedTuple\n",
" import json\n",
"\n",
" import requests\n",
" #import torch\n",
" from google.cloud import aiplatform, storage, firestore\n",
" from PIL import Image\n",
" from google.cloud import storage\n",
" from google.oauth2 import service_account\n",
" \n",
"\n",
" model_name = \"YOUR_MODEL_NAME\"\n",
" #model_name = jobId\n",
" service_account_key_path = 'YOUR_SERVICE_ACCOUNT_KEY_PATH' \n",
" credentials = service_account.Credentials.from_service_account_file(service_account_key_path)\n",
"\n",
" INSTANCE_NAME = 'ethlas game avatar' #@param {type:\"string\"} # CHANGE THIS ACCORDING TO YOUR IDEAS/THEMES/STYLES\n",
" CLASS_NAME = '2d game avatar' #@param {type:\"string\"} # CHANGE THIS ACCORDING TO YOUR IDEAS/THEMES/STYLES\n",
" \n",
" REGION = \"europe-west4\" # @param {type:\"string\"}\n",
" \n",
" # Create three folders under the container's home directory\n",
" os.makedirs(\"/home/class\", exist_ok=True)\n",
" os.makedirs(\"/home/instance\", exist_ok=True)\n",
" os.makedirs(\"/home/model_output\", exist_ok=True)\n",
"\n",
" # Set up the job_name\n",
" # prefix = \"ethlas-stable-diffusion\"\n",
" # user = os.environ.get(\"USER\")\n",
" # now = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
" # job_name = f\"{prefix}-{user}-{now}\"\n",
" \n",
" def update_firestore(jobId, job_name, training_status):\n",
" \"\"\"Updates Firestore collection with pipeline job details.\"\"\"\n",
" # Initialize Firestore client\n",
" service_account_key_path = 'YOUR_SERVICE_ACCOUNT_PATH' \n",
" credentials = service_account.Credentials.from_service_account_file(service_account_key_path)\n",
" db = firestore.Client(project=\"YOUR_PROJECT_ID\", credentials=credentials)\n",
"\n",
" # Define the document reference\n",
" doc_ref = db.collection('vertexPipelineJobs').document(jobId)\n",
"\n",
" # Update the document fields\n",
" doc_ref.set({\n",
" \"uploadJobId\": jobId,\n",
" \"vertexPipelineJobId\": job_name,\n",
" \"trainingStatus\": training_status,\n",
" \"modelDeploymentStatus\": \"Not initiated yet\",\n",
" \"modelEndpoint\": \"Not available yet\"\n",
" })\n",
" \n",
" job_name = jobName\n",
"\n",
" # Define the necessary arguments and parameters for the job\n",
" args = [\n",
" \"--method=diffuser_dreambooth\",\n",
" \"--model_name=runwayml/stable-diffusion-v1-5\",\n",
" \"--input_storage=/gcs/ethlas-customer-1/instance\",\n",
" \"--output_storage=/gcs/ethlas-customer-1/output_artifacts\",\n",
" #\"--output_storage=/home/model_output\",\n",
" \"--prompt=a 2D ethlas game avatar\",\n",
" \"--class_prompt=a 2D game avatar\",\n",
" \"--num_class_images=16\",\n",
" \"--lr=1e-4\",\n",
" \"--use_8bit=True\",\n",
" \"--max_train_steps=100\",\n",
" \"--text_encoder=True\",\n",
" \"--set_grads_to_none=True\"\n",
" ]\n",
" #command = [\"python3\", \"train_wo_nfs.py\"]\n",
" command = [\"python3\", \"train_wo_nfs.py\"] + args\n",
"\n",
" # Build the custom Docker container and push it to the container registry (skipping this step)\n",
"\n",
" # Create the job using aiplatform.CustomContainerTrainingJob\n",
" job = aiplatform.CustomContainerTrainingJob(\n",
" display_name=job_name,\n",
" container_uri=\"YOUR_TRAINING_JOB_CONTAINER_ENDPOINT\",\n",
" command=command,\n",
" # replica_count=1,\n",
" # machine_type=\"n1-standard-8\",\n",
" # accelerator_type=\"NVIDIA_TESLA_T4\",\n",
" # accelerator_count=1,\n",
" location=REGION,\n",
" staging_bucket=\"YOUR_STAGING_BUCKET\"\n",
"\n",
" )\n",
"\n",
" training_status = \"Initiated\"\n",
" update_firestore(jobId, jobName, training_status)\n",
"\n",
" try:\n",
" # Run the job\n",
" job.run(\n",
" replica_count=num_nodes,\n",
" machine_type=machine_type,\n",
" accelerator_type=gpu_type,\n",
" accelerator_count=num_gpus\n",
" )\n",
"\n",
" # Job ran successfully, update trainingStatus to \"Completed\"\n",
" training_status = \"Completed\"\n",
" \n",
" except Exception as e:\n",
" # Job failed, update trainingStatus to \"Failed\" or any other appropriate status\n",
" training_status = \"Failed\"\n",
" # Log the error or take any other actions as needed\n",
"\n",
" finally:\n",
" # Update Firestore collection with the final training_status, whether it's completed or failed\n",
" update_firestore(jobId, jobName, training_status)\n",
" \n",
" #model.save(local_model_output_path) \n",
"\n",
" ###########################################################\n",
"\n",
" #\"\"\"Uploads files in a local directory to a GCS directory.\"\"\"\n",
"\n",
" # for local_file in glob.glob(local_model_output_path + \"/**\"):\n",
" # if not os.path.isfile(local_file):\n",
" # continue\n",
" # filename = local_file[1 + len(local_model_output_path) :]\n",
" # gcs_file_path = os.path.join('gs://ethlas-customer-1/output_artifacts/', filename)\n",
" # _, blob_name = get_bucket_and_blob_name(gcs_file_path)\n",
" # blob = bucket.blob(blob_name)\n",
" # blob.upload_from_filename(local_file)\n",
" # print(\"Copied {} to {}.\".format(local_file, gcs_file_path))\n",
" \n",
" model_output = \"YOUR_OUTPUT_ARTIFACTS\"\n",
" return model_output\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0571d6e9-6576-46a4-8ee4-2f7612d9252c",
"metadata": {},
"outputs": [],
"source": [
"@dsl.component(base_image='YOUR_CUSTOM_CONTAINER_ARTIFACT_REGISTRY_ENDPOINT_PATH')\n",
"def stable_diffusion_model_deployment_job_op(\n",
" model_id: str,\n",
" task: str,\n",
" jobId: str,\n",
" jobName: str,\n",
") -> NamedTuple(\"Outputs\", [('model', str), (\"endpoint_name\", str), (\"endpoint_id\", str)]):\n",
"\n",
" # Import statements\n",
"\n",
" from google.cloud import storage\n",
" import os\n",
" import base64\n",
" import glob\n",
" from datetime import datetime\n",
" from io import BytesIO\n",
" from typing import NamedTuple\n",
" import json\n",
"\n",
" import requests\n",
" #import torch\n",
" from google.cloud import aiplatform, storage, firestore\n",
" from PIL import Image\n",
" from google.cloud import storage\n",
" from google.oauth2 import service_account\n",
" \n",
" def update_firestore(jobId, job_name, model_deployment_status, model_deployment_endpoint):\n",
" \"\"\"Updates Firestore collection with pipeline job details.\"\"\"\n",
" # Initialize Firestore client\n",
"\n",
" service_account_key_path = 'YOUR_SERVICE_ACCOUNT_PATH'\n",
" credentials = service_account.Credentials.from_service_account_file(service_account_key_path)\n",
" db = firestore.Client(project=\"YOUR_PROJECT_ID\", credentials=credentials)\n",
"\n",
" # Define the document reference\n",
" doc_ref = db.collection('vertexPipelineJobs').document(jobId)\n",
"\n",
" # Update the document fields\n",
" doc_ref.set({\n",
" \"uploadJobId\": jobId,\n",
" \"vertexPipelineJobId\": job_name,\n",
" \"trainingStatus\": \"completed\",\n",
" \"modelDeploymentStatus\": model_deployment_status,\n",
" \"modelEndpoint\": model_deployment_endpoint\n",
" })\n",
"\n",
" model_name = \"YOUR_MODEL_NAME\"\n",
" #model_name = jobId\n",
" service_account_key_path = 'YOUR_SERVICE_ACCOUNT_KEY_PATH'\n",
" credentials = service_account.Credentials.from_service_account_file(service_account_key_path)\n",
" endpoint = aiplatform.Endpoint.create(display_name=f\"{jobId}-{task}-endpoint\", credentials=credentials)\n",
" SERVICE_ACCOUNT = \"YOUR_SERVICE_ACCOUNT_EMAIL\"\n",
" SERVE_DOCKER_URI = \"YOUR_SERVING_DOCKER_CONTAINER_URI\"\n",
"\n",
" # Serving environment variables\n",
" serving_env = {\n",
" \"MODEL_ID\": model_id,\n",
" \"TASK\": task,\n",
" }\n",
"\n",
" model = aiplatform.Model.upload(\n",
" display_name=model_name,\n",
" serving_container_image_uri=SERVE_DOCKER_URI,\n",
" serving_container_ports=[7080],\n",
" serving_container_predict_route=\"/predictions/diffusers_serving\",\n",
" serving_container_health_route=\"/ping\",\n",
" serving_container_environment_variables=serving_env,\n",
" )\n",
"\n",
" updated_model_id = model.resource_name.split(\"/\")[-1]\n",
"\n",
" # Set deployment_status to \"initiated\"\n",
" deployment_status = \"Initiated\"\n",
" deployment_endpoint = \"Not available yet\"\n",
" update_firestore(jobId, jobName, deployment_status, deployment_endpoint)\n",
"\n",
" try:\n",
" updated_endpoint = model.deploy(\n",
" endpoint=endpoint,\n",
" machine_type=\"n1-standard-8\",\n",
" accelerator_type=\"NVIDIA_TESLA_V100\",\n",
" accelerator_count=1,\n",
" deploy_request_timeout=1800,\n",
" service_account=SERVICE_ACCOUNT,\n",
" )\n",
" updated_endpoint_name = updated_endpoint.resource_name\n",
" updated_endpoint_id = updated_endpoint_name.split(\"/\")[-1]\n",
"\n",
" # Deployment succeeded, update deployment_status to \"Completed\"\n",
" deployment_status = \"Completed\"\n",
" deployment_endpoint = updated_endpoint.resource_name\n",
"\n",
" except Exception as e:\n",
" # Deployment failed, update deployment_status to \"Failed\" or any other appropriate status\n",
" deployment_status = \"Failed\"\n",
" deployment_endpoint = \"Not available yet\"\n",
" # Log the error or take any other actions as needed\n",
"\n",
" finally:\n",
" # Update Firestore collection with the final deployment_status, whether it's completed or failed\n",
" update_firestore(jobId, jobName, deployment_status, deployment_endpoint)\n",
"\n",
" # Return the model and endpoint details\n",
" return (updated_model_id, updated_endpoint_name, updated_endpoint_id)\n"
]
},
{
"cell_type": "markdown",
"id": "60873567-4599-4896-bd75-a413d8337c2d",
"metadata": {},
"source": [
"## Main Pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5e378129-aa75-4944-8d57-59b629e58488",
"metadata": {},
"outputs": [],
"source": [
"# Define the main pipeline function\n",
"@dsl.pipeline(\n",
" name='Ethlas Stable Diffusion Training Pipeline',\n",
" description='A pipeline for custom training of the Stable Diffusion model'\n",
")\n",
"\n",
"def stable_diffusion_training_pipeline(\n",
" gcs_class_data_path: str = 'gs://<YOUR_BUCKET_NAME>/class',\n",
" gcs_instance_data_path: str = 'gs://<YOUR_BUCKET_NAME>/instance',\n",
" jobId: str = \"defaultname\",\n",
"):\n",
"\n",
" # stable_diffusion_training_job_task = stable_diffusion_training_job_op(\n",
" # model_id=\"runwayml/stable-diffusion-v1-5\",\n",
" # gcs_class_data_path=gcs_class_data_path,\n",
" # gcs_instance_data_path=gcs_instance_data_path,\n",
" # jobId=jobId\n",
" # ).add_node_selector_constraint('NVIDIA_TESLA_T4').set_gpu_limit(1)\n",
" from datetime import datetime\n",
" import os\n",
"\n",
" prefix = f\"ethlas-sd-{jobId}\"\n",
" #user = os.environ.get(\"USER\")\n",
" now = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
" job_name = f\"{prefix}-{now}\"\n",
" \n",
" \n",
" stable_diffusion_training_job_task = stable_diffusion_training_job_op(\n",
" model_id=\"runwayml/stable-diffusion-v1-5\",\n",
" gcs_class_data_path=gcs_class_data_path,\n",
" gcs_instance_data_path=gcs_instance_data_path,\n",
" jobId=jobId,\n",
" jobName=job_name\n",
" ).add_node_selector_constraint('NVIDIA_TESLA_T4').set_gpu_limit(1)\n",
" \n",
" stable_diffusion_training_job_task.set_cpu_request('16')\n",
" stable_diffusion_training_job_task.set_memory_request('64Gi')\n",
"\n",
" \n",
" \n",
" #output_path = stable_diffusion_training_job_task.outputs[\"model_output\"]\n",
" stable_diffusion_model_deployment_job_task = stable_diffusion_model_deployment_job_op(\n",
" #model_id=\"runwayml/stable-diffusion-v1-5\",\n",
" #model_id=stable_diffusion_training_job_task.outputs['model_output'],\n",
" model_id = stable_diffusion_training_job_task.output,\n",
" task=\"image-to-image\",\n",
" jobId=jobId,\n",
" jobName=job_name\n",
" )\n",
" \n",
" #final_model = stable_diffusion_model_deployment_job_task.outputs[\"model\"]\n",
" model_id = stable_diffusion_model_deployment_job_task.outputs[\"model\"]\n",
" endpoint_name = stable_diffusion_model_deployment_job_task.outputs[\"endpoint_name\"]\n",
" endpoint_id = stable_diffusion_model_deployment_job_task.outputs[\"endpoint_id\"]\n",
" \n",
" "
]
},
{
"cell_type": "markdown",
"id": "ff1471fe-8d05-4f8a-a3a8-4ef9361a5d06",
"metadata": {},
"source": [
"## Compile\n",
"\n",
"The compiler takes our pipeline function and compiles it into our pipeline specifiction as json file. This json file we can use to create our pipeline in Vertex AI Pipelines."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "67805950-d511-4895-97fd-b078a5d30ac0",
"metadata": {},
"outputs": [],
"source": [
"\n",
"\n",
"# Compile and run the pipeline\n",
"if __name__ == '__main__':\n",
" # Compile the pipeline to generate a YAML representation\n",
" compiler.Compiler().compile(\n",
" pipeline_func=stable_diffusion_training_pipeline,\n",
" package_path='stable_diffusion_pipeline.yaml'\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "77dac2e6-1ee3-49dd-9301-a835e39dffb9",
"metadata": {},
"source": [
"## Run\n",
"\n",
"Create the run job using the API.\n",
"You can also directly upload the pipeline JSON file in the Vertx AI UI."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "14e072e5-e5e8-45e8-9853-bc77722ee77c",
"metadata": {},
"outputs": [],
"source": [
"\n",
"job = aiplatform.PipelineJob(\n",
" display_name=\"YOUR_PIPELINE_NAME\",\n",
" template_path=\"stable_diffusion_pipeline.yaml\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
" enable_caching=False,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9217e0e8-0d6e-42d4-8555-49ca7cd29591",
"metadata": {},
"outputs": [],
"source": [
"job.run(sync=False)\n",
"#! rm stable_diffusion_pipeline.yaml"
]
}
],
"metadata": {
"environment": {
"kernel": "python3",
"name": "common-cu113.m109",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/base-cu113:m109"
},
"kernelspec": {
"display_name": "Python 3.11.0 64-bit",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.0"
},
"vscode": {
"interpreter": {
"hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}