# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Code generated by the Google Gen AI SDK generator DO NOT EDIT. import io import json import logging import os from typing import Any, Optional, Union from urllib.parse import urlencode import google.auth from . import _api_client from . import _api_module from . import _common from . import _extra_utils from . import _transformers as t from . import types from ._common import get_value_by_path as getv from ._common import set_value_by_path as setv from .pagers import AsyncPager, Pager logger = logging.getLogger('google_genai.files') def _CreateFileParameters_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['file']) is not None: setv(to_object, ['file'], getv(from_object, ['file'])) return to_object def _CreateFileResponse_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) return to_object def _DeleteFileParameters_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'file'], t.t_file_name(getv(from_object, ['name'])) ) return to_object def _DeleteFileResponse_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) return to_object def _GetFileParameters_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['name']) is not None: setv( to_object, ['_url', 'file'], t.t_file_name(getv(from_object, ['name'])) ) return to_object def _ListFilesConfig_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['page_size']) is not None: setv( parent_object, ['_query', 'pageSize'], getv(from_object, ['page_size']) ) if getv(from_object, ['page_token']) is not None: setv( parent_object, ['_query', 'pageToken'], getv(from_object, ['page_token']), ) return to_object def _ListFilesParameters_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['config']) is not None: _ListFilesConfig_to_mldev(getv(from_object, ['config']), to_object) return to_object def _ListFilesResponse_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) if getv(from_object, ['nextPageToken']) is not None: setv(to_object, ['next_page_token'], getv(from_object, ['nextPageToken'])) if getv(from_object, ['files']) is not None: setv(to_object, ['files'], [item for item in getv(from_object, ['files'])]) return to_object def _RegisterFilesParameters_to_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['uris']) is not None: setv(to_object, ['uris'], getv(from_object, ['uris'])) return to_object def _RegisterFilesResponse_from_mldev( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: to_object: dict[str, Any] = {} if getv(from_object, ['sdkHttpResponse']) is not None: setv( to_object, ['sdk_http_response'], getv(from_object, ['sdkHttpResponse']) ) if getv(from_object, ['files']) is not None: setv(to_object, ['files'], [item for item in getv(from_object, ['files'])]) return to_object class Files(_api_module.BaseModule): def _list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> types.ListFilesResponse: parameter_model = types._ListFilesParameters( config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _ListFilesParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files'.format_map(request_url_dict) else: path = 'files' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request('get', path, request_dict, http_options) response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _ListFilesResponse_from_mldev(response_dict) return_value = types.ListFilesResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value def _create( self, *, file: types.FileOrDict, config: Optional[types.CreateFileConfigOrDict] = None, ) -> types.CreateFileResponse: parameter_model = types._CreateFileParameters( file=file, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _CreateFileParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'upload/v1beta/files'.format_map(request_url_dict) else: path = 'upload/v1beta/files' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'post', path, request_dict, http_options ) if config is not None and getattr( config, 'should_return_http_response', None ): return_value = types.CreateFileResponse(sdk_http_response=response) self._api_client._verify_response(return_value) return return_value response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _CreateFileResponse_from_mldev(response_dict) return_value = types.CreateFileResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value def get( self, *, name: str, config: Optional[types.GetFileConfigOrDict] = None ) -> types.File: """Retrieves the file information from the service. Args: name (str): The name identifier for the file to retrieve. config (GetFileConfig): Optional, configuration for the get method. Returns: File: The file information. Usage: .. code-block:: python file = client.files.get(name='files/...') print(file.uri) """ parameter_model = types._GetFileParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _GetFileParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files/{file}'.format_map(request_url_dict) else: path = 'files/{file}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request('get', path, request_dict, http_options) response_dict = {} if not response.body else json.loads(response.body) return_value = types.File._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value def delete( self, *, name: str, config: Optional[types.DeleteFileConfigOrDict] = None ) -> types.DeleteFileResponse: """Deletes a remotely stored file. Args: name (str): The name identifier for the file to delete. config (DeleteFileConfig): Optional, configuration for the delete method. Returns: DeleteFileResponse: The response for the delete method Usage: .. code-block:: python client.files.delete(name='files/...') """ parameter_model = types._DeleteFileParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _DeleteFileParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files/{file}'.format_map(request_url_dict) else: path = 'files/{file}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'delete', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _DeleteFileResponse_from_mldev(response_dict) return_value = types.DeleteFileResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value def _register_files( self, *, uris: list[str], config: Optional[types.RegisterFilesConfigOrDict] = None, ) -> types.RegisterFilesResponse: parameter_model = types._RegisterFilesParameters( uris=uris, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _RegisterFilesParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files:register'.format_map(request_url_dict) else: path = 'files:register' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = self._api_client.request( 'post', path, request_dict, http_options ) if config is not None and getattr( config, 'should_return_http_response', None ): return_value = types.RegisterFilesResponse(sdk_http_response=response) self._api_client._verify_response(return_value) return return_value response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _RegisterFilesResponse_from_mldev(response_dict) return_value = types.RegisterFilesResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value def upload( self, *, file: Union[str, os.PathLike[str], io.IOBase], config: Optional[types.UploadFileConfigOrDict] = None, ) -> types.File: """Calls the API to upload a file using a supported file service. Args: file: A path to the file or an `IOBase` object to be uploaded. If it's an IOBase object, it must be opened in blocking (the default) mode and binary mode. In other words, do not use non-blocking mode or text mode. The given stream must be seekable, that is, it must be able to call `seek()` on 'path'. config: Optional parameters to set `diplay_name`, `mime_type`, and `name`. """ if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) config_model = types.UploadFileConfig() if config: if isinstance(config, dict): config_model = types.UploadFileConfig(**config) else: config_model = config file_obj = types.File( mime_type=config_model.mime_type, name=config_model.name, display_name=config_model.display_name, ) else: # if not config file_obj = types.File() if file_obj.name is not None and not file_obj.name.startswith('files/'): file_obj.name = f'files/{file_obj.name}' http_options, size_bytes, mime_type = _extra_utils.prepare_resumable_upload( file, user_http_options=config_model.http_options, user_mime_type=config_model.mime_type, ) file_obj.size_bytes = size_bytes file_obj.mime_type = mime_type response = self._create( file=file_obj, config=types.CreateFileConfig( http_options=http_options, should_return_http_response=True ), ) if ( response.sdk_http_response is None or response.sdk_http_response.headers is None or 'x-goog-upload-url' not in response.sdk_http_response.headers ): raise KeyError( 'Failed to create file. Upload URL did not returned from the create' ' file request.' ) upload_url = response.sdk_http_response.headers['x-goog-upload-url'] if isinstance(file, io.IOBase): return_file = self._api_client.upload_file( file, upload_url, file_obj.size_bytes, http_options=http_options ) else: fs_path = os.fspath(file) return_file = self._api_client.upload_file( fs_path, upload_url, file_obj.size_bytes, http_options=http_options ) return types.File._from_response( response=return_file.json['file'], kwargs=config_model.model_dump() if config else {}, ) def download( self, *, file: Union[str, types.File, types.Video, types.GeneratedVideo], config: Optional[types.DownloadFileConfigOrDict] = None, ) -> bytes: """Downloads a file's data from storage. Files created by `upload` can't be downloaded. You can tell which files are downloadable by checking the `source` or `download_uri` property. Note: This method returns the data as bytes. For `Video` and `GeneratedVideo` objects there is an additional side effect, that it also sets the `video_bytes` property on the `Video` object. Args: file (str): A file name, uri, or file object. Identifying which file to download. config (DownloadFileConfigOrDict): Optional, configuration for the get method. Returns: File: The file data as bytes. Usage: .. code-block:: python for file client.files.list(): if file.download_uri is not None: break else: raise ValueError('No files found with a `download_uri`.') data = client.files.download(file=file) # data = client.files.download(file=file.name) # data = client.files.download(file=file.download_uri) video = types.Video(uri=file.uri) video_bytes = client.files.download(file=video) video.video_bytes """ if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) config_model = None if config: if isinstance(config, dict): config_model = types.DownloadFileConfig(**config) else: config_model = config if isinstance(file, types.File) and file.download_uri is None: raise ValueError( "Only generated files can be downloaded, uploaded files can't be " 'downloaded. You can tell which files are downloadable by checking ' 'the `source` or `download_uri` property.' ) name = t.t_file_name(file) path = f'files/{name}:download' query_params = {'alt': 'media'} path = f'{path}?{urlencode(query_params)}' http_options = None if getv(config_model, ['http_options']) is not None: http_options = getv(config_model, ['http_options']) data = self._api_client.download_file( path, http_options=http_options, ) if isinstance(file, types.Video): file.video_bytes = data elif isinstance(file, types.GeneratedVideo) and file.video is not None: file.video.video_bytes = data return data def register_files( self, *, auth: google.auth.credentials.Credentials, uris: list[str], config: Optional[types.RegisterFilesConfigOrDict] = None, ) -> types.RegisterFilesResponse: """Registers gcs files with the file service.""" if not isinstance(auth, google.auth.credentials.Credentials): raise ValueError( 'auth must be a google.auth.credentials.Credentials object.' ) if config is None: config = types.RegisterFilesConfig() else: config = types.RegisterFilesConfig.model_validate(config) config = config.model_copy(deep=True) http_options = config.http_options or types.HttpOptions() headers = http_options.headers or {} headers = {k.lower(): v for k, v in headers.items()} token = _api_client.get_token_from_credentials(self._api_client, auth) headers['authorization'] = f'Bearer {token}' if auth.quota_project_id: headers['x-goog-user-project'] = auth.quota_project_id http_options.headers = headers config.http_options = http_options return self._register_files(uris=uris, config=config) def list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> Pager[types.File]: """Lists all files from the service. Args: config (ListFilesConfig): Optional, configuration for the list method. Returns: A Pager object that contains one page of files. When iterating over the pager, it automatically fetches the next page if there are more. Usage: .. code-block:: python for file in client.files.list(config={'page_size': 10}): print(file.name) """ list_request = self._list return Pager( 'files', list_request, self._list(config=config), config, ) class AsyncFiles(_api_module.BaseModule): async def _list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> types.ListFilesResponse: parameter_model = types._ListFilesParameters( config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _ListFilesParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files'.format_map(request_url_dict) else: path = 'files' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'get', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _ListFilesResponse_from_mldev(response_dict) return_value = types.ListFilesResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value async def _create( self, *, file: types.FileOrDict, config: Optional[types.CreateFileConfigOrDict] = None, ) -> types.CreateFileResponse: parameter_model = types._CreateFileParameters( file=file, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _CreateFileParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'upload/v1beta/files'.format_map(request_url_dict) else: path = 'upload/v1beta/files' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'post', path, request_dict, http_options ) if config is not None and getattr( config, 'should_return_http_response', None ): return_value = types.CreateFileResponse(sdk_http_response=response) self._api_client._verify_response(return_value) return return_value response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _CreateFileResponse_from_mldev(response_dict) return_value = types.CreateFileResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value async def get( self, *, name: str, config: Optional[types.GetFileConfigOrDict] = None ) -> types.File: """Retrieves the file information from the service. Args: name (str): The name identifier for the file to retrieve. config (GetFileConfig): Optional, configuration for the get method. Returns: File: The file information. Usage: .. code-block:: python file = await client.aio.files.get(name='files/...') print(file.uri) """ parameter_model = types._GetFileParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _GetFileParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files/{file}'.format_map(request_url_dict) else: path = 'files/{file}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'get', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) return_value = types.File._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value async def delete( self, *, name: str, config: Optional[types.DeleteFileConfigOrDict] = None ) -> types.DeleteFileResponse: """Deletes a remotely stored file. Args: name (str): The name identifier for the file to delete. config (DeleteFileConfig): Optional, configuration for the delete method. Returns: DeleteFileResponse: The response for the delete method Usage: .. code-block:: python await client.aio.files.delete(name='files/...') """ parameter_model = types._DeleteFileParameters( name=name, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _DeleteFileParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files/{file}'.format_map(request_url_dict) else: path = 'files/{file}' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'delete', path, request_dict, http_options ) response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _DeleteFileResponse_from_mldev(response_dict) return_value = types.DeleteFileResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) return_value.sdk_http_response = types.HttpResponse( headers=response.headers ) self._api_client._verify_response(return_value) return return_value async def _register_files( self, *, uris: list[str], config: Optional[types.RegisterFilesConfigOrDict] = None, ) -> types.RegisterFilesResponse: parameter_model = types._RegisterFilesParameters( uris=uris, config=config, ) request_url_dict: Optional[dict[str, str]] if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) else: request_dict = _RegisterFilesParameters_to_mldev(parameter_model) request_url_dict = request_dict.get('_url') if request_url_dict: path = 'files:register'.format_map(request_url_dict) else: path = 'files:register' query_params = request_dict.get('_query') if query_params: path = f'{path}?{urlencode(query_params)}' # TODO: remove the hack that pops config. request_dict.pop('config', None) http_options: Optional[types.HttpOptions] = None if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): http_options = parameter_model.config.http_options request_dict = _common.convert_to_dict(request_dict) request_dict = _common.encode_unserializable_types(request_dict) response = await self._api_client.async_request( 'post', path, request_dict, http_options ) if config is not None and getattr( config, 'should_return_http_response', None ): return_value = types.RegisterFilesResponse(sdk_http_response=response) self._api_client._verify_response(return_value) return return_value response_dict = {} if not response.body else json.loads(response.body) if not self._api_client.vertexai: response_dict = _RegisterFilesResponse_from_mldev(response_dict) return_value = types.RegisterFilesResponse._from_response( response=response_dict, kwargs=parameter_model.model_dump() ) self._api_client._verify_response(return_value) return return_value async def upload( self, *, file: Union[str, os.PathLike[str], io.IOBase], config: Optional[types.UploadFileConfigOrDict] = None, ) -> types.File: """Calls the API to upload a file asynchronously using a supported file service. Args: file: A path to the file or an `IOBase` object to be uploaded. If it's an IOBase object, it must be opened in blocking (the default) mode and binary mode. In other words, do not use non-blocking mode or text mode. The given stream must be seekable, that is, it must be able to call `seek()` on 'path'. config: Optional parameters to set `diplay_name`, `mime_type`, and `name`. """ if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) config_model = types.UploadFileConfig() if config: if isinstance(config, dict): config_model = types.UploadFileConfig(**config) else: config_model = config file_obj = types.File( mime_type=config_model.mime_type, name=config_model.name, display_name=config_model.display_name, ) else: # if not config file_obj = types.File() if file_obj.name is not None and not file_obj.name.startswith('files/'): file_obj.name = f'files/{file_obj.name}' http_options, size_bytes, mime_type = _extra_utils.prepare_resumable_upload( file, user_http_options=config_model.http_options, user_mime_type=config_model.mime_type, ) file_obj.size_bytes = size_bytes file_obj.mime_type = mime_type response = await self._create( file=file_obj, config=types.CreateFileConfig( http_options=http_options, should_return_http_response=True ), ) if ( response.sdk_http_response is None or response.sdk_http_response.headers is None or ( 'x-goog-upload-url' not in response.sdk_http_response.headers and 'X-Goog-Upload-URL' not in response.sdk_http_response.headers ) ): raise KeyError( 'Failed to create file. Upload URL did not returned from the create' ' file request.' ) elif 'x-goog-upload-url' in response.sdk_http_response.headers: upload_url = response.sdk_http_response.headers['x-goog-upload-url'] else: upload_url = response.sdk_http_response.headers['X-Goog-Upload-URL'] if isinstance(file, io.IOBase): return_file = await self._api_client.async_upload_file( file, upload_url, file_obj.size_bytes, http_options=http_options ) else: fs_path = os.fspath(file) return_file = await self._api_client.async_upload_file( fs_path, upload_url, file_obj.size_bytes, http_options=http_options ) return types.File._from_response( response=return_file.json['file'], kwargs=config_model.model_dump() if config else {}, ) async def download( self, *, file: Union[str, types.File], config: Optional[types.DownloadFileConfigOrDict] = None, ) -> bytes: """Downloads a file's data from the file service. The Vertex-AI implementation of the API foes not include the file service. Files created by `upload` can't be downloaded. You can tell which files are downloadable by checking the `download_uri` property. Args: File (str): A file name, uri, or file object. Identifying which file to download. config (DownloadFileConfigOrDict): Optional, configuration for the get method. Returns: File: The file data as bytes. Usage: .. code-block:: python for file client.files.list(): if file.download_uri is not None: break else: raise ValueError('No files found with a `download_uri`.') data = client.files.download(file=file) # data = client.files.download(file=file.name) # data = client.files.download(file=file.uri) """ if self._api_client.vertexai: raise ValueError( 'This method is only supported in the Gemini Developer client.' ) config_model = None if config: if isinstance(config, dict): config_model = types.DownloadFileConfig(**config) else: config_model = config name = t.t_file_name(file) path = f'files/{name}:download' http_options = None if getv(config_model, ['http_options']) is not None: http_options = getv(config_model, ['http_options']) query_params = {'alt': 'media'} if query_params: path = f'{path}?{urlencode(query_params)}' data = await self._api_client.async_download_file( path, http_options=http_options, ) return data async def register_files( self, *, auth: google.auth.credentials.Credentials, uris: list[str], config: Optional[types.RegisterFilesConfigOrDict] = None, ) -> types.RegisterFilesResponse: """Registers gcs files with the file service.""" if not isinstance(auth, google.auth.credentials.Credentials): raise ValueError( 'auth must be a google.auth.credentials.Credentials object.' ) if config is None: config = types.RegisterFilesConfig() else: config = types.RegisterFilesConfig.model_validate(config) config = config.model_copy(deep=True) http_options = config.http_options or types.HttpOptions() headers = http_options.headers or {} headers = {k.lower(): v for k, v in headers.items()} token = await _api_client.async_get_token_from_credentials( self._api_client, auth ) headers['authorization'] = f'Bearer {token}' if auth.quota_project_id: headers['x-goog-user-project'] = auth.quota_project_id http_options.headers = headers config.http_options = http_options return await self._register_files(uris=uris, config=config) async def list( self, *, config: Optional[types.ListFilesConfigOrDict] = None ) -> AsyncPager[types.File]: """Lists all files from the service asynchronously. Args: config (ListFilesConfig): Optional, configuration for the list method. Returns: A Pager object that contains one page of files. When iterating over the pager, it automatically fetches the next page if there are more. Usage: .. code-block:: python async for file in await client.aio.files.list(config={'page_size': 10}): print(file.name) """ list_request = self._list return AsyncPager( 'files', list_request, await self._list(config=config), config, )