# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Code generated by the Google Gen AI SDK generator DO NOT EDIT. import json import logging from typing import Any, Optional, TypeVar, Union from urllib.parse import urlencode from . import _api_module from . import _common from . import types from ._common import get_value_by_path as getv from ._common import set_value_by_path as setv logger = logging.getLogger('google_genai.operations') def _FetchPredictOperationParameters_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['operation_name']) is not None: setv(to_object, ['operationName'], getv(from_object, ['operation_name'])) if getv(from_object, ['resource_name']) is not None: setv( to_object, ['_url', 'resourceName'], getv(from_object, ['resource_name']), ) return to_object def _GetOperationParameters_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['operation_name']) is not None: setv( to_object, ['_url', 'operationName'], getv(from_object, ['operation_name']), ) return to_object def _GetOperationParameters_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['operation_name']) is not None: setv( to_object, ['_url', 'operationName'], getv(from_object, ['operation_name']), ) return to_object def _GetProjectOperationParameters_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['operation_id']) is not None: setv( to_object, ['_url', 'operation_id'], getv(from_object, ['operation_id']) ) return to_object class Operations(_api_module.BaseModule): def _get_videos_operation( self, *, operation_name: str, config: Optional[types.GetOperationConfigOrDict] = None, ) -> Any: parameter_model = types._GetOperationParameters( operation_name=operation_name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _GetOperationParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{operationName}'.format_map(request_url_dict) else: path = '{operationName}' else: request_dict = _GetOperationParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{operationName}'.format_map(request_url_dict) else: path = '{operationName}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request('get', path, request_dict, http_options) response_dict = {} if not response.body else json.loads(response.body) return response_dict def _fetch_predict_videos_operation( self, *, operation_name: str, resource_name: str, config: Optional[types.FetchPredictOperationConfigOrDict] = None, ) -> Any: parameter_model = types._FetchPredictOperationParameters( operation_name=operation_name, resource_name=resource_name, config=config, ) request_url_dict: Optional[dict[str, str]] if not self._api_client.vertexai: raise ValueError('This method is only supported in the Vertex AI client.') else: request_dict = _FetchPredictOperationParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{resourceName}:fetchPredictOperation'.format_map( request_url_dict ) else: path = '{resourceName}:fetchPredictOperation' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'post', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) return response_dict def _get( self, *, operation_id: str, config: Optional[types.GetOperationConfigOrDict] = None, ) -> types.ProjectOperation: parameter_model = types._GetProjectOperationParameters( operation_id=operation_id, config=config, ) request_url_dict: Optional[dict[str, str]] if not self._api_client.vertexai: raise ValueError('This method is only supported in the Vertex AI client.') else: request_dict = _GetProjectOperationParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'operations/{operation_id}'.format_map(request_url_dict) else: path = 'operations/{operation_id}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request('get', path, request_dict, http_options) response_dict = {} if not response.body else json.loads(response.body) return_value = types.ProjectOperation._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value T = TypeVar('T', bound=types.Operation) def get( self, operation: T, *, config: Optional[types.GetOperationConfigOrDict] = None, ) -> T: """Gets the status of an operation.""" # Currently, only GenerateVideosOperation is supported. # TODO(b/398040607): Support short form names operation_name = operation.name if not operation_name: raise ValueError('Operation name is empty.') # TODO(b/398233524): Cast operation types if self._api_client.vertexai: resource_name = operation_name.rpartition('/operations/')[0] http_options = types.HttpOptions() if isinstance(config, dict): dict_options = config.get('http_options', None) if dict_options is not None: http_options = types.HttpOptions(**dict(dict_options)) elif isinstance(config, types.GetOperationConfig) and config is not None: http_options = ( config.http_options if config.http_options is not None else types.HttpOptions() ) fetch_operation_config = types.FetchPredictOperationConfig( http_options=http_options ) response_dict = self._fetch_predict_videos_operation( operation_name=operation_name, resource_name=resource_name, config=fetch_operation_config, ) response_operation = operation.from_api_response( response_dict, is_vertex_ai=True ) self._api_client._verify_response(response_operation) # type: ignore [arg-type] return response_operation # type: ignore[no-any-return] else: response_dict = self._get_videos_operation( operation_name=operation_name, config=config, ) response_operation = operation.from_api_response( response_dict, is_vertex_ai=False ) self._api_client._verify_response(response_operation) # type: ignore [arg-type] return response_operation # type: ignore[no-any-return] class AsyncOperations(_api_module.BaseModule): async def _get_videos_operation( self, *, operation_name: str, config: Optional[types.GetOperationConfigOrDict] = None, ) -> Any: parameter_model = types._GetOperationParameters( operation_name=operation_name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _GetOperationParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{operationName}'.format_map(request_url_dict) else: path = '{operationName}' else: request_dict = _GetOperationParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{operationName}'.format_map(request_url_dict) else: path = '{operationName}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'get', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) return response_dict async def _fetch_predict_videos_operation( self, *, operation_name: str, resource_name: str, config: Optional[types.FetchPredictOperationConfigOrDict] = None, ) -> Any: parameter_model = types._FetchPredictOperationParameters( operation_name=operation_name, resource_name=resource_name, config=config, ) request_url_dict: Optional[dict[str, str]] if not self._api_client.vertexai: raise ValueError('This method is only supported in the Vertex AI client.') else: request_dict = _FetchPredictOperationParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{resourceName}:fetchPredictOperation'.format_map( request_url_dict ) else: path = '{resourceName}:fetchPredictOperation' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'post', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) return response_dict async def _get( self, *, operation_id: str, config: Optional[types.GetOperationConfigOrDict] = None, ) -> types.ProjectOperation: parameter_model = types._GetProjectOperationParameters( operation_id=operation_id, config=config, ) request_url_dict: Optional[dict[str, str]] if not self._api_client.vertexai: raise ValueError('This method is only supported in the Vertex AI client.') else: request_dict = _GetProjectOperationParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'operations/{operation_id}'.format_map(request_url_dict) else: path = 'operations/{operation_id}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'get', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) return_value = types.ProjectOperation._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value T = TypeVar('T', bound=types.Operation) async def get( self, operation: T, *, config: Optional[types.GetOperationConfigOrDict] = None, ) -> T: """Gets the status of an operation.""" # Currently, only GenerateVideosOperation is supported. operation_name = operation.name if not operation_name: raise ValueError('Operation name is empty.') if self._api_client.vertexai: resource_name = operation_name.rpartition('/operations/')[0] http_options = types.HttpOptions() if isinstance(config, dict): dict_options = config.get('http_options', None) if dict_options is not None: http_options = types.HttpOptions(**dict(dict_options)) elif isinstance(config, types.GetOperationConfig) and config is not None: http_options = ( config.http_options if config.http_options is not None else types.HttpOptions() ) fetch_operation_config = types.FetchPredictOperationConfig( http_options=http_options ) response_dict = await self._fetch_predict_videos_operation( operation_name=operation_name, resource_name=resource_name, config=fetch_operation_config, ) response_operation = operation.from_api_response( response_dict, is_vertex_ai=True ) return response_operation # type: ignore[no-any-return] else: response_dict = await self._get_videos_operation( operation_name=operation_name, config=config, ) response_operation = operation.from_api_response( response_dict, is_vertex_ai=False ) return response_operation # type: ignore[no-any-return]