# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Code generated by the Google Gen AI SDK generator DO NOT EDIT. import json import logging from typing import Any, Optional, Union from urllib.parse import urlencode from . import _api_module from . import _common from . import _extra_utils from . import _transformers as t from . import types from ._api_client import BaseApiClient from ._common import get_value_by_path as getv from ._common import move_value_by_path as movev from ._common import set_value_by_path as setv from .pagers import AsyncPager, Pager logger = logging.getLogger('google_genai.batches') def _BatchJobDestination_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['responsesFile']) is not None: setv(to_object, ['file_name'], getv(from_object, ['responsesFile'])) if getv(from_object, ['inlinedResponses', 'inlinedResponses']) is not None: setv( to_object, ['inlined_responses'], [ _InlinedResponse_from_mldev(item, to_object) for item in getv( from_object, ['inlinedResponses', 'inlinedResponses'] ) ], ) if ( getv(from_object, ['inlinedEmbedContentResponses', 'inlinedResponses']) is not None ): setv( to_object, ['inlined_embed_content_responses'], [ item for item in getv( from_object, ['inlinedEmbedContentResponses', 'inlinedResponses'], ) ], ) return to_object def _BatchJobDestination_from_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['predictionsFormat']) is not None: setv(to_object, ['format'], getv(from_object, ['predictionsFormat'])) if getv(from_object, ['gcsDestination', 'outputUriPrefix']) is not None: setv( to_object, ['gcs_uri'], getv(from_object, ['gcsDestination', 'outputUriPrefix']), ) if getv(from_object, ['bigqueryDestination', 'outputUri']) is not None: setv( to_object, ['bigquery_uri'], getv(from_object, ['bigqueryDestination', 'outputUri']), ) return to_object def _BatchJobDestination_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['format']) is not None: setv(to_object, ['predictionsFormat'], getv(from_object, ['format'])) if getv(from_object, ['gcs_uri']) is not None: setv( to_object, ['gcsDestination', 'outputUriPrefix'], getv(from_object, ['gcs_uri']), ) if getv(from_object, ['bigquery_uri']) is not None: setv( to_object, ['bigqueryDestination', 'outputUri'], getv(from_object, ['bigquery_uri']), ) if getv(from_object, ['file_name']) is not None: raise ValueError('file_name parameter is not supported in Vertex AI.') if getv(from_object, ['inlined_responses']) is not None: raise ValueError( 'inlined_responses parameter is not supported in Vertex AI.' ) if getv(from_object, ['inlined_embed_content_responses']) is not None: raise ValueError( 'inlined_embed_content_responses parameter is not supported in' ' Vertex AI.' ) return to_object def _BatchJobSource_from_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['instancesFormat']) is not None: setv(to_object, ['format'], getv(from_object, ['instancesFormat'])) if getv(from_object, ['gcsSource', 'uris']) is not None: setv(to_object, ['gcs_uri'], getv(from_object, ['gcsSource', 'uris'])) if getv(from_object, ['bigquerySource', 'inputUri']) is not None: setv( to_object, ['bigquery_uri'], getv(from_object, ['bigquerySource', 'inputUri']), ) return to_object def _BatchJobSource_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['format']) is not None: raise ValueError('format parameter is not supported in Gemini API.') if getv(from_object, ['gcs_uri']) is not None: raise ValueError('gcs_uri parameter is not supported in Gemini API.') if getv(from_object, ['bigquery_uri']) is not None: raise ValueError('bigquery_uri parameter is not supported in Gemini API.') if getv(from_object, ['file_name']) is not None: setv(to_object, ['fileName'], getv(from_object, ['file_name'])) if getv(from_object, ['inlined_requests']) is not None: setv( to_object, ['requests', 'requests'], [ _InlinedRequest_to_mldev(api_client, item, to_object) for item in getv(from_object, ['inlined_requests']) ], ) return to_object def _BatchJobSource_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['format']) is not None: setv(to_object, ['instancesFormat'], getv(from_object, ['format'])) if getv(from_object, ['gcs_uri']) is not None: setv(to_object, ['gcsSource', 'uris'], getv(from_object, ['gcs_uri'])) if getv(from_object, ['bigquery_uri']) is not None: setv( to_object, ['bigquerySource', 'inputUri'], getv(from_object, ['bigquery_uri']), ) if getv(from_object, ['file_name']) is not None: raise ValueError('file_name parameter is not supported in Vertex AI.') if getv(from_object, ['inlined_requests']) is not None: raise ValueError( 'inlined_requests parameter is not supported in Vertex AI.' ) return to_object def _BatchJob_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv(to_object, ['name'], getv(from_object, ['name'])) if getv(from_object, ['metadata', 'displayName']) is not None: setv( to_object, ['display_name'], getv(from_object, ['metadata', 'displayName']), ) if getv(from_object, ['metadata', 'state']) is not None: setv( to_object, ['state'], t.t_job_state(getv(from_object, ['metadata', 'state'])), ) if getv(from_object, ['metadata', 'createTime']) is not None: setv( to_object, ['create_time'], getv(from_object, ['metadata', 'createTime']), ) if getv(from_object, ['metadata', 'endTime']) is not None: setv(to_object, ['end_time'], getv(from_object, ['metadata', 'endTime'])) if getv(from_object, ['metadata', 'updateTime']) is not None: setv( to_object, ['update_time'], getv(from_object, ['metadata', 'updateTime']), ) if getv(from_object, ['metadata', 'model']) is not None: setv(to_object, ['model'], getv(from_object, ['metadata', 'model'])) if getv(from_object, ['metadata', 'output']) is not None: setv( to_object, ['dest'], _BatchJobDestination_from_mldev( t.t_recv_batch_job_destination( getv(from_object, ['metadata', 'output']) ), to_object, ), ) return to_object def _BatchJob_from_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv(to_object, ['name'], getv(from_object, ['name'])) if getv(from_object, ['displayName']) is not None: setv(to_object, ['display_name'], getv(from_object, ['displayName'])) if getv(from_object, ['state']) is not None: setv(to_object, ['state'], t.t_job_state(getv(from_object, ['state']))) if getv(from_object, ['error']) is not None: setv(to_object, ['error'], getv(from_object, ['error'])) if getv(from_object, ['createTime']) is not None: setv(to_object, ['create_time'], getv(from_object, ['createTime'])) if getv(from_object, ['startTime']) is not None: setv(to_object, ['start_time'], getv(from_object, ['startTime'])) if getv(from_object, ['endTime']) is not None: setv(to_object, ['end_time'], getv(from_object, ['endTime'])) if getv(from_object, ['updateTime']) is not None: setv(to_object, ['update_time'], getv(from_object, ['updateTime'])) if getv(from_object, ['model']) is not None: setv(to_object, ['model'], getv(from_object, ['model'])) if getv(from_object, ['inputConfig']) is not None: setv( to_object, ['src'], _BatchJobSource_from_vertex( getv(from_object, ['inputConfig']), to_object ), ) if getv(from_object, ['outputConfig']) is not None: setv( to_object, ['dest'], _BatchJobDestination_from_vertex( t.t_recv_batch_job_destination(getv(from_object, ['outputConfig'])), to_object, ), ) if getv(from_object, ['completionStats']) is not None: setv( to_object, ['completion_stats'], getv(from_object, ['completionStats']) ) return to_object def _Blob_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['data']) is not None: setv(to_object, ['data'], getv(from_object, ['data'])) if getv(from_object, ['display_name']) is not None: raise ValueError('display_name parameter is not supported in Gemini API.') if getv(from_object, ['mime_type']) is not None: setv(to_object, ['mimeType'], getv(from_object, ['mime_type'])) return to_object def _CancelBatchJobParameters_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'name'], t.t_batch_job_name(api_client, getv(from_object, ['name'])), ) return to_object def _CancelBatchJobParameters_to_vertex( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'name'], t.t_batch_job_name(api_client, getv(from_object, ['name'])), ) return to_object def _Candidate_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['content']) is not None: setv(to_object, ['content'], getv(from_object, ['content'])) if getv(from_object, ['citationMetadata']) is not None: setv( to_object, ['citation_metadata'], _CitationMetadata_from_mldev( getv(from_object, ['citationMetadata']), to_object ), ) if getv(from_object, ['tokenCount']) is not None: setv(to_object, ['token_count'], getv(from_object, ['tokenCount'])) if getv(from_object, ['finishReason']) is not None: setv(to_object, ['finish_reason'], getv(from_object, ['finishReason'])) if getv(from_object, ['avgLogprobs']) is not None: setv(to_object, ['avg_logprobs'], getv(from_object, ['avgLogprobs'])) if getv(from_object, ['groundingMetadata']) is not None: setv( to_object, ['grounding_metadata'], getv(from_object, ['groundingMetadata']), ) if getv(from_object, ['index']) is not None: setv(to_object, ['index'], getv(from_object, ['index'])) if getv(from_object, ['logprobsResult']) is not None: setv(to_object, ['logprobs_result'], getv(from_object, ['logprobsResult'])) if getv(from_object, ['safetyRatings']) is not None: setv( to_object, ['safety_ratings'], [item for item in getv(from_object, ['safetyRatings'])], ) if getv(from_object, ['urlContextMetadata']) is not None: setv( to_object, ['url_context_metadata'], getv(from_object, ['urlContextMetadata']), ) return to_object def _CitationMetadata_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['citationSources']) is not None: setv( to_object, ['citations'], [item for item in getv(from_object, ['citationSources'])], ) return to_object def _Content_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['parts']) is not None: setv( to_object, ['parts'], [ _Part_to_mldev(item, to_object) for item in getv(from_object, ['parts']) ], ) if getv(from_object, ['role']) is not None: setv(to_object, ['role'], getv(from_object, ['role'])) return to_object def _CreateBatchJobConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['display_name']) is not None: setv( parent_object, ['batch', 'displayName'], getv(from_object, ['display_name']), ) if getv(from_object, ['dest']) is not None: raise ValueError('dest parameter is not supported in Gemini API.') return to_object def _CreateBatchJobConfig_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['display_name']) is not None: setv(parent_object, ['displayName'], getv(from_object, ['display_name'])) if getv(from_object, ['dest']) is not None: setv( parent_object, ['outputConfig'], _BatchJobDestination_to_vertex( t.t_batch_job_destination(getv(from_object, ['dest'])), to_object ), ) return to_object def _CreateBatchJobParameters_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['model']) is not None: setv( to_object, ['_url', 'model'], t.t_model(api_client, getv(from_object, ['model'])), ) if getv(from_object, ['src']) is not None: setv( to_object, ['batch', 'inputConfig'], _BatchJobSource_to_mldev( api_client, t.t_batch_job_source(api_client, getv(from_object, ['src'])), to_object, ), ) if getv(from_object, ['config']) is not None: _CreateBatchJobConfig_to_mldev(getv(from_object, ['config']), to_object) return to_object def _CreateBatchJobParameters_to_vertex( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['model']) is not None: setv( to_object, ['model'], t.t_model(api_client, getv(from_object, ['model'])), ) if getv(from_object, ['src']) is not None: setv( to_object, ['inputConfig'], _BatchJobSource_to_vertex( t.t_batch_job_source(api_client, getv(from_object, ['src'])), to_object, ), ) if getv(from_object, ['config']) is not None: _CreateBatchJobConfig_to_vertex(getv(from_object, ['config']), to_object) return to_object def _CreateEmbeddingsBatchJobConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['display_name']) is not None: setv( parent_object, ['batch', 'displayName'], getv(from_object, ['display_name']), ) return to_object def _CreateEmbeddingsBatchJobParameters_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['model']) is not None: setv( to_object, ['_url', 'model'], t.t_model(api_client, getv(from_object, ['model'])), ) if getv(from_object, ['src']) is not None: setv( to_object, ['batch', 'inputConfig'], _EmbeddingsBatchJobSource_to_mldev( api_client, getv(from_object, ['src']), to_object ), ) if getv(from_object, ['config']) is not None: _CreateEmbeddingsBatchJobConfig_to_mldev( getv(from_object, ['config']), to_object ) return to_object def _DeleteBatchJobParameters_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'name'], t.t_batch_job_name(api_client, getv(from_object, ['name'])), ) return to_object def _DeleteBatchJobParameters_to_vertex( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'name'], t.t_batch_job_name(api_client, getv(from_object, ['name'])), ) return to_object def _DeleteResourceJob_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) if getv(from_object, ['name']) is not None: setv(to_object, ['name'], getv(from_object, ['name'])) if getv(from_object, ['done']) is not None: setv(to_object, ['done'], getv(from_object, ['done'])) if getv(from_object, ['error']) is not None: setv(to_object, ['error'], getv(from_object, ['error'])) return to_object def _DeleteResourceJob_from_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) if getv(from_object, ['name']) is not None: setv(to_object, ['name'], getv(from_object, ['name'])) if getv(from_object, ['done']) is not None: setv(to_object, ['done'], getv(from_object, ['done'])) if getv(from_object, ['error']) is not None: setv(to_object, ['error'], getv(from_object, ['error'])) return to_object def _EmbedContentBatch_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['contents']) is not None: setv( to_object, ['requests[]', 'request', 'content'], [ item for item in t.t_contents_for_embed( api_client, getv(from_object, ['contents']) ) ], ) if getv(from_object, ['config']) is not None: setv( to_object, ['_self'], _EmbedContentConfig_to_mldev(getv(from_object, ['config']), to_object), ) movev(to_object, {'requests[].*': 'requests[].request.*'}) return to_object def _EmbedContentConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['task_type']) is not None: setv( parent_object, ['requests[]', 'taskType'], getv(from_object, ['task_type']), ) if getv(from_object, ['title']) is not None: setv(parent_object, ['requests[]', 'title'], getv(from_object, ['title'])) if getv(from_object, ['output_dimensionality']) is not None: setv( parent_object, ['requests[]', 'outputDimensionality'], getv(from_object, ['output_dimensionality']), ) if getv(from_object, ['mime_type']) is not None: raise ValueError('mime_type parameter is not supported in Gemini API.') if getv(from_object, ['auto_truncate']) is not None: raise ValueError('auto_truncate parameter is not supported in Gemini API.') return to_object def _EmbeddingsBatchJobSource_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['file_name']) is not None: setv(to_object, ['file_name'], getv(from_object, ['file_name'])) if getv(from_object, ['inlined_requests']) is not None: setv( to_object, ['requests'], _EmbedContentBatch_to_mldev( api_client, getv(from_object, ['inlined_requests']), to_object ), ) return to_object def _FileData_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['display_name']) is not None: raise ValueError('display_name parameter is not supported in Gemini API.') if getv(from_object, ['file_uri']) is not None: setv(to_object, ['fileUri'], getv(from_object, ['file_uri'])) if getv(from_object, ['mime_type']) is not None: setv(to_object, ['mimeType'], getv(from_object, ['mime_type'])) return to_object def _FunctionCall_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['id']) is not None: setv(to_object, ['id'], getv(from_object, ['id'])) if getv(from_object, ['args']) is not None: setv(to_object, ['args'], getv(from_object, ['args'])) if getv(from_object, ['name']) is not None: setv(to_object, ['name'], getv(from_object, ['name'])) if getv(from_object, ['partial_args']) is not None: raise ValueError('partial_args parameter is not supported in Gemini API.') if getv(from_object, ['will_continue']) is not None: raise ValueError('will_continue parameter is not supported in Gemini API.') return to_object def _FunctionCallingConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['allowed_function_names']) is not None: setv( to_object, ['allowedFunctionNames'], getv(from_object, ['allowed_function_names']), ) if getv(from_object, ['mode']) is not None: setv(to_object, ['mode'], getv(from_object, ['mode'])) if getv(from_object, ['stream_function_call_arguments']) is not None: raise ValueError( 'stream_function_call_arguments parameter is not supported in Gemini' ' API.' ) return to_object def _GenerateContentConfig_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['system_instruction']) is not None: setv( parent_object, ['systemInstruction'], _Content_to_mldev( t.t_content(getv(from_object, ['system_instruction'])), to_object ), ) if getv(from_object, ['temperature']) is not None: setv(to_object, ['temperature'], getv(from_object, ['temperature'])) if getv(from_object, ['top_p']) is not None: setv(to_object, ['topP'], getv(from_object, ['top_p'])) if getv(from_object, ['top_k']) is not None: setv(to_object, ['topK'], getv(from_object, ['top_k'])) if getv(from_object, ['candidate_count']) is not None: setv(to_object, ['candidateCount'], getv(from_object, ['candidate_count'])) if getv(from_object, ['max_output_tokens']) is not None: setv( to_object, ['maxOutputTokens'], getv(from_object, ['max_output_tokens']) ) if getv(from_object, ['stop_sequences']) is not None: setv(to_object, ['stopSequences'], getv(from_object, ['stop_sequences'])) if getv(from_object, ['response_logprobs']) is not None: setv( to_object, ['responseLogprobs'], getv(from_object, ['response_logprobs']), ) if getv(from_object, ['logprobs']) is not None: setv(to_object, ['logprobs'], getv(from_object, ['logprobs'])) if getv(from_object, ['presence_penalty']) is not None: setv( to_object, ['presencePenalty'], getv(from_object, ['presence_penalty']) ) if getv(from_object, ['frequency_penalty']) is not None: setv( to_object, ['frequencyPenalty'], getv(from_object, ['frequency_penalty']), ) if getv(from_object, ['seed']) is not None: setv(to_object, ['seed'], getv(from_object, ['seed'])) if getv(from_object, ['response_mime_type']) is not None: setv( to_object, ['responseMimeType'], getv(from_object, ['response_mime_type']), ) if getv(from_object, ['response_schema']) is not None: setv( to_object, ['responseSchema'], t.t_schema(api_client, getv(from_object, ['response_schema'])), ) if getv(from_object, ['response_json_schema']) is not None: setv( to_object, ['responseJsonSchema'], getv(from_object, ['response_json_schema']), ) if getv(from_object, ['routing_config']) is not None: raise ValueError('routing_config parameter is not supported in Gemini API.') if getv(from_object, ['model_selection_config']) is not None: raise ValueError( 'model_selection_config parameter is not supported in Gemini API.' ) if getv(from_object, ['safety_settings']) is not None: setv( parent_object, ['safetySettings'], [ _SafetySetting_to_mldev(item, to_object) for item in getv(from_object, ['safety_settings']) ], ) if getv(from_object, ['tools']) is not None: setv( parent_object, ['tools'], [ _Tool_to_mldev(t.t_tool(api_client, item), to_object) for item in t.t_tools(api_client, getv(from_object, ['tools'])) ], ) if getv(from_object, ['tool_config']) is not None: setv( parent_object, ['toolConfig'], _ToolConfig_to_mldev(getv(from_object, ['tool_config']), to_object), ) if getv(from_object, ['labels']) is not None: raise ValueError('labels parameter is not supported in Gemini API.') if getv(from_object, ['cached_content']) is not None: setv( parent_object, ['cachedContent'], t.t_cached_content_name( api_client, getv(from_object, ['cached_content']) ), ) if getv(from_object, ['response_modalities']) is not None: setv( to_object, ['responseModalities'], getv(from_object, ['response_modalities']), ) if getv(from_object, ['media_resolution']) is not None: setv( to_object, ['mediaResolution'], getv(from_object, ['media_resolution']) ) if getv(from_object, ['speech_config']) is not None: setv( to_object, ['speechConfig'], t.t_speech_config(getv(from_object, ['speech_config'])), ) if getv(from_object, ['audio_timestamp']) is not None: raise ValueError( 'audio_timestamp parameter is not supported in Gemini API.' ) if getv(from_object, ['thinking_config']) is not None: setv(to_object, ['thinkingConfig'], getv(from_object, ['thinking_config'])) if getv(from_object, ['image_config']) is not None: setv( to_object, ['imageConfig'], _ImageConfig_to_mldev(getv(from_object, ['image_config']), to_object), ) if getv(from_object, ['enable_enhanced_civic_answers']) is not None: setv( to_object, ['enableEnhancedCivicAnswers'], getv(from_object, ['enable_enhanced_civic_answers']), ) if getv(from_object, ['model_armor_config']) is not None: raise ValueError( 'model_armor_config parameter is not supported in Gemini API.' ) return to_object def _GenerateContentResponse_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) if getv(from_object, ['candidates']) is not None: setv( to_object, ['candidates'], [ _Candidate_from_mldev(item, to_object) for item in getv(from_object, ['candidates']) ], ) if getv(from_object, ['modelVersion']) is not None: setv(to_object, ['model_version'], getv(from_object, ['modelVersion'])) if getv(from_object, ['promptFeedback']) is not None: setv(to_object, ['prompt_feedback'], getv(from_object, ['promptFeedback'])) if getv(from_object, ['responseId']) is not None: setv(to_object, ['response_id'], getv(from_object, ['responseId'])) if getv(from_object, ['usageMetadata']) is not None: setv(to_object, ['usage_metadata'], getv(from_object, ['usageMetadata'])) return to_object def _GetBatchJobParameters_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'name'], t.t_batch_job_name(api_client, getv(from_object, ['name'])), ) return to_object def _GetBatchJobParameters_to_vertex( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'name'], t.t_batch_job_name(api_client, getv(from_object, ['name'])), ) return to_object def _GoogleMaps_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['auth_config']) is not None: raise ValueError('auth_config parameter is not supported in Gemini API.') if getv(from_object, ['enable_widget']) is not None: setv(to_object, ['enableWidget'], getv(from_object, ['enable_widget'])) return to_object def _GoogleSearch_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['exclude_domains']) is not None: raise ValueError( 'exclude_domains parameter is not supported in Gemini API.' ) if getv(from_object, ['blocking_confidence']) is not None: raise ValueError( 'blocking_confidence parameter is not supported in Gemini API.' ) if getv(from_object, ['time_range_filter']) is not None: setv( to_object, ['timeRangeFilter'], getv(from_object, ['time_range_filter']) ) return to_object def _ImageConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['aspect_ratio']) is not None: setv(to_object, ['aspectRatio'], getv(from_object, ['aspect_ratio'])) if getv(from_object, ['image_size']) is not None: setv(to_object, ['imageSize'], getv(from_object, ['image_size'])) if getv(from_object, ['person_generation']) is not None: raise ValueError( 'person_generation parameter is not supported in Gemini API.' ) if getv(from_object, ['output_mime_type']) is not None: raise ValueError( 'output_mime_type parameter is not supported in Gemini API.' ) if getv(from_object, ['output_compression_quality']) is not None: raise ValueError( 'output_compression_quality parameter is not supported in Gemini API.' ) return to_object def _InlinedRequest_to_mldev( api_client: BaseApiClient, from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['model']) is not None: setv( to_object, ['request', 'model'], t.t_model(api_client, getv(from_object, ['model'])), ) if getv(from_object, ['contents']) is not None: setv( to_object, ['request', 'contents'], [ _Content_to_mldev(item, to_object) for item in t.t_contents(getv(from_object, ['contents'])) ], ) if getv(from_object, ['metadata']) is not None: setv(to_object, ['metadata'], getv(from_object, ['metadata'])) if getv(from_object, ['config']) is not None: setv( to_object, ['request', 'generationConfig'], _GenerateContentConfig_to_mldev( api_client, getv(from_object, ['config']), getv(to_object, ['request'], default_value={}), ), ) return to_object def _InlinedResponse_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['response']) is not None: setv( to_object, ['response'], _GenerateContentResponse_from_mldev( getv(from_object, ['response']), to_object ), ) if getv(from_object, ['error']) is not None: setv(to_object, ['error'], getv(from_object, ['error'])) return to_object def _ListBatchJobsConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['page_size']) is not None: setv( parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size']) ) if getv(from_object, ['page_token']) is not None: setv( parent_object, ['_query', 'pageToken'], getv(from_object, ['page_token']), ) if getv(from_object, ['filter']) is not None: raise ValueError('filter parameter is not supported in Gemini API.') return to_object def _ListBatchJobsConfig_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['page_size']) is not None: setv( parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size']) ) if getv(from_object, ['page_token']) is not None: setv( parent_object, ['_query', 'pageToken'], getv(from_object, ['page_token']), ) if getv(from_object, ['filter']) is not None: setv(parent_object, ['_query', 'filter'], getv(from_object, ['filter'])) return to_object def _ListBatchJobsParameters_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['config']) is not None: _ListBatchJobsConfig_to_mldev(getv(from_object, ['config']), to_object) return to_object def _ListBatchJobsParameters_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['config']) is not None: _ListBatchJobsConfig_to_vertex(getv(from_object, ['config']), to_object) return to_object def _ListBatchJobsResponse_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) if getv(from_object, ['nextPageToken']) is not None: setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken'])) if getv(from_object, ['operations']) is not None: setv( to_object, ['batch_jobs'], [ _BatchJob_from_mldev(item, to_object) for item in getv(from_object, ['operations']) ], ) return to_object def _ListBatchJobsResponse_from_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) if getv(from_object, ['nextPageToken']) is not None: setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken'])) if getv(from_object, ['batchPredictionJobs']) is not None: setv( to_object, ['batch_jobs'], [ _BatchJob_from_vertex(item, to_object) for item in getv(from_object, ['batchPredictionJobs']) ], ) return to_object def _Part_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['media_resolution']) is not None: setv( to_object, ['mediaResolution'], getv(from_object, ['media_resolution']) ) if getv(from_object, ['code_execution_result']) is not None: setv( to_object, ['codeExecutionResult'], getv(from_object, ['code_execution_result']), ) if getv(from_object, ['executable_code']) is not None: setv(to_object, ['executableCode'], getv(from_object, ['executable_code'])) if getv(from_object, ['file_data']) is not None: setv( to_object, ['fileData'], _FileData_to_mldev(getv(from_object, ['file_data']), to_object), ) if getv(from_object, ['function_call']) is not None: setv( to_object, ['functionCall'], _FunctionCall_to_mldev(getv(from_object, ['function_call']), to_object), ) if getv(from_object, ['function_response']) is not None: setv( to_object, ['functionResponse'], getv(from_object, ['function_response']), ) if getv(from_object, ['inline_data']) is not None: setv( to_object, ['inlineData'], _Blob_to_mldev(getv(from_object, ['inline_data']), to_object), ) if getv(from_object, ['text']) is not None: setv(to_object, ['text'], getv(from_object, ['text'])) if getv(from_object, ['thought']) is not None: setv(to_object, ['thought'], getv(from_object, ['thought'])) if getv(from_object, ['thought_signature']) is not None: setv( to_object, ['thoughtSignature'], getv(from_object, ['thought_signature']), ) if getv(from_object, ['video_metadata']) is not None: setv(to_object, ['videoMetadata'], getv(from_object, ['video_metadata'])) return to_object def _SafetySetting_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['category']) is not None: setv(to_object, ['category'], getv(from_object, ['category'])) if getv(from_object, ['method']) is not None: raise ValueError('method parameter is not supported in Gemini API.') if getv(from_object, ['threshold']) is not None: setv(to_object, ['threshold'], getv(from_object, ['threshold'])) return to_object def _ToolConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['retrieval_config']) is not None: setv( to_object, ['retrievalConfig'], getv(from_object, ['retrieval_config']) ) if getv(from_object, ['function_calling_config']) is not None: setv( to_object, ['functionCallingConfig'], _FunctionCallingConfig_to_mldev( getv(from_object, ['function_calling_config']), to_object ), ) return to_object def _Tool_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['retrieval']) is not None: raise ValueError('retrieval parameter is not supported in Gemini API.') if getv(from_object, ['computer_use']) is not None: setv(to_object, ['computerUse'], getv(from_object, ['computer_use'])) if getv(from_object, ['file_search']) is not None: setv(to_object, ['fileSearch'], getv(from_object, ['file_search'])) if getv(from_object, ['code_execution']) is not None: setv(to_object, ['codeExecution'], getv(from_object, ['code_execution'])) if getv(from_object, ['enterprise_web_search']) is not None: raise ValueError( 'enterprise_web_search parameter is not supported in Gemini API.' ) if getv(from_object, ['function_declarations']) is not None: setv( to_object, ['functionDeclarations'], [item for item in getv(from_object, ['function_declarations'])], ) if getv(from_object, ['google_maps']) is not None: setv( to_object, ['googleMaps'], _GoogleMaps_to_mldev(getv(from_object, ['google_maps']), to_object), ) if getv(from_object, ['google_search']) is not None: setv( to_object, ['googleSearch'], _GoogleSearch_to_mldev(getv(from_object, ['google_search']), to_object), ) if getv(from_object, ['google_search_retrieval']) is not None: setv( to_object, ['googleSearchRetrieval'], getv(from_object, ['google_search_retrieval']), ) if getv(from_object, ['url_context']) is not None: setv(to_object, ['urlContext'], getv(from_object, ['url_context'])) return to_object class Batches(_api_module.BaseModule): def _create( self, *, model: Optional[str] = None, src: Union[types.BatchJobSourceUnion, types.BatchJobSourceUnionDict], config: Optional[types.CreateBatchJobConfigOrDict] = None, ) -> types.BatchJob: parameter_model = types._CreateBatchJobParameters( model=model, src=src, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _CreateBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs'.format_map(request_url_dict) else: path = 'batchPredictionJobs' else: request_dict = _CreateBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{model}:batchGenerateContent'.format_map(request_url_dict) else: path = '{model}:batchGenerateContent' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'post', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _BatchJob_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _BatchJob_from_mldev(response_dict) return_value = types.BatchJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value def _create_embeddings( self, *, model: Optional[str] = None, src: types.EmbeddingsBatchJobSourceOrDict, config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None, ) -> types.BatchJob: parameter_model = types._CreateEmbeddingsBatchJobParameters( model=model, src=src, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _CreateEmbeddingsBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{model}:asyncBatchEmbedContent'.format_map(request_url_dict) else: path = '{model}:asyncBatchEmbedContent' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'post', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _BatchJob_from_mldev(response_dict) return_value = types.BatchJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value def get( self, *, name: str, config: Optional[types.GetBatchJobConfigOrDict] = None ) -> types.BatchJob: """Gets a batch job. Args: name (str): A fully-qualified BatchJob resource name or ID. Example: "projects/.../locations/.../batchPredictionJobs/456" or "456" when project and location are initialized in the Vertex AI client. Or "batches/abc" using the Gemini Developer AI client. Returns: A BatchJob object that contains details about the batch job. Usage: .. code-block:: python batch_job = client.batches.get(name='123456789') print(f"Batch job: {batch_job.name}, state {batch_job.state}") """ parameter_model = types._GetBatchJobParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _GetBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs/{name}'.format_map(request_url_dict) else: path = 'batchPredictionJobs/{name}' else: request_dict = _GetBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches/{name}'.format_map(request_url_dict) else: path = 'batches/{name}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request('get', path, request_dict, http_options) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _BatchJob_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _BatchJob_from_mldev(response_dict) return_value = types.BatchJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value def cancel( self, *, name: str, config: Optional[types.CancelBatchJobConfigOrDict] = None, ) -> None: """Cancels a batch job. Only available for batch jobs that are running or pending. Args: name (str): A fully-qualified BatchJob resource name or ID. Example: "projects/.../locations/.../batchPredictionJobs/456" or "456" when project and location are initialized in the Vertex AI client. Or "batches/abc" using the Gemini Developer AI client. Usage: .. code-block:: python client.batches.cancel(name='123456789') """ parameter_model = types._CancelBatchJobParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _CancelBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs/{name}:cancel'.format_map(request_url_dict) else: path = 'batchPredictionJobs/{name}:cancel' else: request_dict = _CancelBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches/{name}:cancel'.format_map(request_url_dict) else: path = 'batches/{name}:cancel' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'post', path, request_dict, http_options ) def _list( self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None ) -> types.ListBatchJobsResponse: parameter_model = types._ListBatchJobsParameters( config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _ListBatchJobsParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs'.format_map(request_url_dict) else: path = 'batchPredictionJobs' else: request_dict = _ListBatchJobsParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches'.format_map(request_url_dict) else: path = 'batches' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request('get', path, request_dict, http_options) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _ListBatchJobsResponse_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _ListBatchJobsResponse_from_mldev(response_dict) return_value = types.ListBatchJobsResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value def delete( self, *, name: str, config: Optional[types.DeleteBatchJobConfigOrDict] = None, ) -> types.DeleteResourceJob: """Deletes a batch job. Args: name (str): A fully-qualified BatchJob resource name or ID. Example: "projects/.../locations/.../batchPredictionJobs/456" or "456" when project and location are initialized in the client. Returns: A DeleteResourceJob object that shows the status of the deletion. Usage: .. code-block:: python client.batches.delete(name='123456789') """ parameter_model = types._DeleteBatchJobParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _DeleteBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs/{name}'.format_map(request_url_dict) else: path = 'batchPredictionJobs/{name}' else: request_dict = _DeleteBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches/{name}'.format_map(request_url_dict) else: path = 'batches/{name}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'delete', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _DeleteResourceJob_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _DeleteResourceJob_from_mldev(response_dict) return_value = types.DeleteResourceJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value def create( self, *, model: str, src: types.BatchJobSourceUnionDict, config: Optional[types.CreateBatchJobConfigOrDict] = None, ) -> types.BatchJob: """Creates a batch job. Args: model (str): The model to use for the batch job. src: The source of the batch job. Currently Vertex AI supports GCS URI(-s) or BigQuery URI. Example: "gs://path/to/input/data" or "bq://projectId.bqDatasetId.bqTableId". Gemini Developer API supports List of inlined_request, or file name. Example: "files/file_name". config (CreateBatchJobConfig): Optional configuration for the batch job. Returns: A BatchJob object that contains details about the batch job. Usage: .. code-block:: python batch_job = client.batches.create( model="gemini-2.0-flash-001", src="gs://path/to/input/data", ) print(batch_job.state) """ src = t.t_batch_job_source(self._api_client, src) # Convert all dicts to Pydantic objects. parameter_model = types._CreateBatchJobParameters( model=model, src=src, config=config, ) if self._api_client.vertexai: config = _extra_utils.format_destination(src, parameter_model.config) return self._create(model=model, src=src, config=config) else: return self._create(model=model, src=src, config=config) def create_embeddings( self, *, model: str, src: types.EmbeddingsBatchJobSourceOrDict, config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None, ) -> types.BatchJob: """**Experimental** Creates an embedding batch job. Args: model (str): The model to use for the batch job. src: Gemini Developer API supports List of inlined_request, or file name. Example: "files/file_name". config (CreateBatchJobConfig): Optional configuration for the batch job. Returns: A BatchJob object that contains details about the batch job. Usage: .. code-block:: python batch_job = client.batches.create_embeddings( model="text-embedding-004", src="files/my_embedding_input", ) print(batch_job.state) """ import warnings warnings.warn( 'batches.create_embeddings() is experimental and may change without' ' notice.', category=_common.ExperimentalWarning, stacklevel=2, # This is crucial! ) src = t.t_embedding_batch_job_source(self._api_client, src) # Convert all dicts to Pydantic objects. parameter_model = types._CreateEmbeddingsBatchJobParameters( model=model, src=src, config=config, ) if self._api_client.vertexai: raise ValueError('Vertex AI does not support batches.create_embeddings.') else: return self._create_embeddings(model=model, src=src, config=config) def list( self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None ) -> Pager[types.BatchJob]: """Lists batch jobs. Args: config (ListBatchJobsConfig): Optional configuration for the list request. Returns: A Pager object that contains one page of batch jobs. When iterating over the pager, it automatically fetches the next page if there are more. Usage: .. code-block:: python config = {'page_size': 10} for batch_job in client.batches.list(config): print(batch_job.name) """ list_request = self._list return Pager( 'batch_jobs', list_request, self._list(config=config), config, ) class AsyncBatches(_api_module.BaseModule): async def _create( self, *, model: Optional[str] = None, src: Union[types.BatchJobSourceUnion, types.BatchJobSourceUnionDict], config: Optional[types.CreateBatchJobConfigOrDict] = None, ) -> types.BatchJob: parameter_model = types._CreateBatchJobParameters( model=model, src=src, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _CreateBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs'.format_map(request_url_dict) else: path = 'batchPredictionJobs' else: request_dict = _CreateBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{model}:batchGenerateContent'.format_map(request_url_dict) else: path = '{model}:batchGenerateContent' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'post', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _BatchJob_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _BatchJob_from_mldev(response_dict) return_value = types.BatchJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value async def _create_embeddings( self, *, model: Optional[str] = None, src: types.EmbeddingsBatchJobSourceOrDict, config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None, ) -> types.BatchJob: parameter_model = types._CreateEmbeddingsBatchJobParameters( model=model, src=src, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _CreateEmbeddingsBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = '{model}:asyncBatchEmbedContent'.format_map(request_url_dict) else: path = '{model}:asyncBatchEmbedContent' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'post', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _BatchJob_from_mldev(response_dict) return_value = types.BatchJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value async def get( self, *, name: str, config: Optional[types.GetBatchJobConfigOrDict] = None ) -> types.BatchJob: """Gets a batch job. Args: name (str): A fully-qualified BatchJob resource name or ID. Example: "projects/.../locations/.../batchPredictionJobs/456" or "456" when project and location are initialized in the Vertex AI client. Or "batches/abc" using the Gemini Developer AI client. Returns: A BatchJob object that contains details about the batch job. Usage: .. code-block:: python batch_job = await client.aio.batches.get(name='123456789') print(f"Batch job: {batch_job.name}, state {batch_job.state}") """ parameter_model = types._GetBatchJobParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _GetBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs/{name}'.format_map(request_url_dict) else: path = 'batchPredictionJobs/{name}' else: request_dict = _GetBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches/{name}'.format_map(request_url_dict) else: path = 'batches/{name}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'get', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _BatchJob_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _BatchJob_from_mldev(response_dict) return_value = types.BatchJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value async def cancel( self, *, name: str, config: Optional[types.CancelBatchJobConfigOrDict] = None, ) -> None: """Cancels a batch job. Only available for batch jobs that are running or pending. Args: name (str): A fully-qualified BatchJob resource name or ID. Example: "projects/.../locations/.../batchPredictionJobs/456" or "456" when project and location are initialized in the Vertex AI client. Or "batches/abc" using the Gemini Developer AI client. Usage: .. code-block:: python await client.aio.batches.cancel(name='123456789') """ parameter_model = types._CancelBatchJobParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _CancelBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs/{name}:cancel'.format_map(request_url_dict) else: path = 'batchPredictionJobs/{name}:cancel' else: request_dict = _CancelBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches/{name}:cancel'.format_map(request_url_dict) else: path = 'batches/{name}:cancel' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'post', path, request_dict, http_options ) async def _list( self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None ) -> types.ListBatchJobsResponse: parameter_model = types._ListBatchJobsParameters( config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _ListBatchJobsParameters_to_vertex(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs'.format_map(request_url_dict) else: path = 'batchPredictionJobs' else: request_dict = _ListBatchJobsParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches'.format_map(request_url_dict) else: path = 'batches' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'get', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _ListBatchJobsResponse_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _ListBatchJobsResponse_from_mldev(response_dict) return_value = types.ListBatchJobsResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value async def delete( self, *, name: str, config: Optional[types.DeleteBatchJobConfigOrDict] = None, ) -> types.DeleteResourceJob: """Deletes a batch job. Args: name (str): A fully-qualified BatchJob resource name or ID. Example: "projects/.../locations/.../batchPredictionJobs/456" or "456" when project and location are initialized in the client. Returns: A DeleteResourceJob object that shows the status of the deletion. Usage: .. code-block:: python await client.aio.batches.delete(name='123456789') """ parameter_model = types._DeleteBatchJobParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: request_dict = _DeleteBatchJobParameters_to_vertex( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batchPredictionJobs/{name}'.format_map(request_url_dict) else: path = 'batchPredictionJobs/{name}' else: request_dict = _DeleteBatchJobParameters_to_mldev( self._api_client, parameter_model ) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'batches/{name}'.format_map(request_url_dict) else: path = 'batches/{name}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'delete', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if self._api_client.vertexai: response_dict = _DeleteResourceJob_from_vertex(response_dict) if not self._api_client.vertexai: response_dict = _DeleteResourceJob_from_mldev(response_dict) return_value = types.DeleteResourceJob._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value async def create( self, *, model: str, src: types.BatchJobSourceUnionDict, config: Optional[types.CreateBatchJobConfigOrDict] = None, ) -> types.BatchJob: """Creates a batch job asynchronously. Args: model (str): The model to use for the batch job. src: The source of the batch job. Currently Vertex AI supports GCS URI(-s) or BigQuery URI. Example: "gs://path/to/input/data" or "bq://projectId.bqDatasetId.bqTableId". Gemini Develop API supports List of inlined_request, or file name. Example: "files/file_name". config (CreateBatchJobConfig): Optional configuration for the batch job. Returns: A BatchJob object that contains details about the batch job. Usage: .. code-block:: python batch_job = await client.aio.batches.create( model="gemini-2.0-flash-001", src="gs://path/to/input/data", ) """ src = t.t_batch_job_source(self._api_client, src) # Convert all dicts to Pydantic objects. parameter_model = types._CreateBatchJobParameters( model=model, src=src, config=config, ) if self._api_client.vertexai: config = _extra_utils.format_destination(src, parameter_model.config) return await self._create(model=model, src=src, config=config) else: return await self._create(model=model, src=src, config=config) async def create_embeddings( self, *, model: str, src: types.EmbeddingsBatchJobSourceOrDict, config: Optional[types.CreateEmbeddingsBatchJobConfigOrDict] = None, ) -> types.BatchJob: """**Experimental** Creates an asynchronously embedding batch job. Args: model (str): The model to use for the batch job. src: Gemini Developer API supports inlined_requests, or file name. Example: "files/file_name". config (CreateBatchJobConfig): Optional configuration for the batch job. Returns: A BatchJob object that contains details about the batch job. Usage: .. code-block:: python batch_job = await client.aio.batches.create_embeddings( model="text-embedding-004", src="files/my_embedding_input", ) print(batch_job.state) """ import warnings warnings.warn( 'batches.create_embeddings() is experimental and may change without' ' notice.', category=_common.ExperimentalWarning, stacklevel=2, # This is crucial! ) src = t.t_embedding_batch_job_source(self._api_client, src) # Convert all dicts to Pydantic objects. parameter_model = types._CreateEmbeddingsBatchJobParameters( model=model, src=src, config=config, ) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options if self._api_client.vertexai: raise ValueError('Vertex AI does not support batches.create_embeddings.') else: return await self._create_embeddings(model=model, src=src, config=config) async def list( self, *, config: Optional[types.ListBatchJobsConfigOrDict] = None ) -> AsyncPager[types.BatchJob]: """Lists batch jobs asynchronously. Args: config (ListBatchJobsConfig): Optional configuration for the list request. Returns: A Pager object that contains one page of batch jobs. When iterating over the pager, it automatically fetches the next page if there are more. Usage: .. code-block:: python async for batch_job in await client.aio.batches.list(): print(batch_job.name) """ list_request = self._list return AsyncPager( 'batch_jobs', list_request, await self._list(config=config), config, )