openai代码研读:OpenAI Python SDK 中 AsyncOpenAI 类的定义
该代码展示了AsyncOpenAI客户端的实现,它支持异步调用OpenAI的API服务。客户端初始化时,会自动从环境变量获取API密钥和基础URL(若无则使用默认值)。它提供了多种功能模块(如聊天、嵌入、文件处理等)的异步接口,并支持原始响应和流式响应处理。使用时需传入API密钥,可通过环境变量或直接参数设置。示例展示了如何初始化客户端并调用聊天功能生成文本。
·
代码
class AsyncOpenAI(AsyncAPIClient):
completions: completions.AsyncCompletions
chat: chat.AsyncChat
embeddings: embeddings.AsyncEmbeddings
files: files.AsyncFiles
images: images.AsyncImages
audio: audio.AsyncAudio
moderations: moderations.AsyncModerations
models: models.AsyncModels
fine_tuning: fine_tuning.AsyncFineTuning
vector_stores: vector_stores.AsyncVectorStores
beta: beta.AsyncBeta
batches: batches.AsyncBatches
uploads: uploads.AsyncUploads
responses: responses.AsyncResponses
with_raw_response: AsyncOpenAIWithRawResponse
with_streaming_response: AsyncOpenAIWithStreamedResponse
# client options
api_key: str
organization: str | None
project: str | None
websocket_base_url: str | httpx.URL | None
"""Base URL for WebSocket connections.
If not specified, the default base URL will be used, with 'wss://' replacing the
'http://' or 'https://' scheme. For example: 'http://example.com' becomes
'wss://example.com'
"""
def __init__(
self,
*,
api_key: str | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | httpx.URL | None = None,
websocket_base_url: str | httpx.URL | None = None,
timeout: Union[float, Timeout, None, NotGiven] = NOT_GIVEN,
max_retries: int = DEFAULT_MAX_RETRIES,
default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
# Configure a custom httpx client.
# We provide a `DefaultAsyncHttpxClient` class that you can pass to retain the default values we use for `limits`, `timeout` & `follow_redirects`.
# See the [httpx documentation](https://www.python-httpx.org/api/#asyncclient) for more details.
http_client: httpx.AsyncClient | None = None,
# Enable or disable schema validation for data returned by the API.
# When enabled an error APIResponseValidationError is raised
# if the API responds with invalid data for the expected schema.
#
# This parameter may be removed or changed in the future.
# If you rely on this feature, please open a GitHub issue
# outlining your use-case to help us decide if it should be
# part of our public interface in the future.
_strict_response_validation: bool = False,
) -> None:
"""Construct a new async AsyncOpenAI client instance.
This automatically infers the following arguments from their corresponding environment variables if they are not provided:
- `api_key` from `OPENAI_API_KEY`
- `organization` from `OPENAI_ORG_ID`
- `project` from `OPENAI_PROJECT_ID`
"""
if api_key is None:
api_key = os.environ.get("OPENAI_API_KEY")
if api_key is None:
raise OpenAIError(
"The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable"
)
self.api_key = api_key
if organization is None:
organization = os.environ.get("OPENAI_ORG_ID")
self.organization = organization
if project is None:
project = os.environ.get("OPENAI_PROJECT_ID")
self.project = project
self.websocket_base_url = websocket_base_url
if base_url is None:
base_url = os.environ.get("OPENAI_BASE_URL")
if base_url is None:
base_url = f"https://api.openai.com/v1"
super().__init__(
version=__version__,
base_url=base_url,
max_retries=max_retries,
timeout=timeout,
http_client=http_client,
custom_headers=default_headers,
custom_query=default_query,
_strict_response_validation=_strict_response_validation,
)
self._default_stream_cls = AsyncStream
self.completions = completions.AsyncCompletions(self)
self.chat = chat.AsyncChat(self)
self.embeddings = embeddings.AsyncEmbeddings(self)
self.files = files.AsyncFiles(self)
self.images = images.AsyncImages(self)
self.audio = audio.AsyncAudio(self)
self.moderations = moderations.AsyncModerations(self)
self.models = models.AsyncModels(self)
self.fine_tuning = fine_tuning.AsyncFineTuning(self)
self.vector_stores = vector_stores.AsyncVectorStores(self)
self.beta = beta.AsyncBeta(self)
self.batches = batches.AsyncBatches(self)
self.uploads = uploads.AsyncUploads(self)
self.responses = responses.AsyncResponses(self)
self.with_raw_response = AsyncOpenAIWithRawResponse(self)
self.with_streaming_response = AsyncOpenAIWithStreamedResponse(self)
@property
@override
def qs(self) -> Querystring:
return Querystring(array_format="brackets")
@property
@override
def auth_headers(self) -> dict[str, str]:
api_key = self.api_key
return {"Authorization": f"Bearer {api_key}"}
@property
@override
def default_headers(self) -> dict[str, str | Omit]:
return {
**super().default_headers,
"X-Stainless-Async": f"async:{get_async_library()}",
"OpenAI-Organization": self.organization if self.organization is not None else Omit(),
"OpenAI-Project": self.project if self.project is not None else Omit(),
**self._custom_headers,
}
def copy(
self,
*,
api_key: str | None = None,
organization: str | None = None,
project: str | None = None,
websocket_base_url: str | httpx.URL | None = None,
base_url: str | httpx.URL | None = None,
timeout: float | Timeout | None | NotGiven = NOT_GIVEN,
http_client: httpx.AsyncClient | None = None,
max_retries: int | NotGiven = NOT_GIVEN,
default_headers: Mapping[str, str] | None = None,
set_default_headers: Mapping[str, str] | None = None,
default_query: Mapping[str, object] | None = None,
set_default_query: Mapping[str, object] | None = None,
_extra_kwargs: Mapping[str, Any] = {},
) -> Self:
"""
Create a new client instance re-using the same options given to the current client with optional overriding.
"""
if default_headers is not None and set_default_headers is not None:
raise ValueError("The `default_headers` and `set_default_headers` arguments are mutually exclusive")
if default_query is not None and set_default_query is not None:
raise ValueError("The `default_query` and `set_default_query` arguments are mutually exclusive")
headers = self._custom_headers
if default_headers is not None:
headers = {**headers, **default_headers}
elif set_default_headers is not None:
headers = set_default_headers
params = self._custom_query
if default_query is not None:
params = {**params, **default_query}
elif set_default_query is not None:
params = set_default_query
http_client = http_client or self._client
return self.__class__(
api_key=api_key or self.api_key,
organization=organization or self.organization,
project=project or self.project,
websocket_base_url=websocket_base_url or self.websocket_base_url,
base_url=base_url or self.base_url,
timeout=self.timeout if isinstance(timeout, NotGiven) else timeout,
http_client=http_client,
max_retries=max_retries if is_given(max_retries) else self.max_retries,
default_headers=headers,
default_query=params,
**_extra_kwargs,
)
# Alias for `copy` for nicer inline usage, e.g.
# client.with_options(timeout=10).foo.create(...)
with_options = copy
@override
def _make_status_error(
self,
err_msg: str,
*,
body: object,
response: httpx.Response,
) -> APIStatusError:
data = body.get("error", body) if is_mapping(body) else body
if response.status_code == 400:
return _exceptions.BadRequestError(err_msg, response=response, body=data)
if response.status_code == 401:
return _exceptions.AuthenticationError(err_msg, response=response, body=data)
if response.status_code == 403:
return _exceptions.PermissionDeniedError(err_msg, response=response, body=data)
if response.status_code == 404:
return _exceptions.NotFoundError(err_msg, response=response, body=data)
if response.status_code == 409:
return _exceptions.ConflictError(err_msg, response=response, body=data)
if response.status_code == 422:
return _exceptions.UnprocessableEntityError(err_msg, response=response, body=data)
if response.status_code == 429:
return _exceptions.RateLimitError(err_msg, response=response, body=data)
if response.status_code >= 500:
return _exceptions.InternalServerError(err_msg, response=response, body=data)
return APIStatusError(err_msg, response=response, body=data)
从中可以看到,如果没有传递api_key和base_url参数,那么会自动从环境变量中找OPENAI_API_KEY 和OPENAI_BASE_URL
如果找不到OPENAI_API_KEY ,会报错
如果找不到OPENAI_BASE_URL,会自动用默认地址:https://api.openai.com/v1
if api_key is None:
api_key = os.environ.get("OPENAI_API_KEY")
if api_key is None:
raise OpenAIError(
"The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable"
)
self.api_key = api_key
if organization is None:
organization = os.environ.get("OPENAI_ORG_ID")
self.organization = organization
if project is None:
project = os.environ.get("OPENAI_PROJECT_ID")
self.project = project
self.websocket_base_url = websocket_base_url
if base_url is None:
base_url = os.environ.get("OPENAI_BASE_URL")
if base_url is None:
base_url = f"https://api.openai.com/v1"
调用例子
# 基本初始化
client = AsyncOpenAI(api_key="your-api-key")
# 使用聊天功能
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
# 使用原始响应
raw_client = client.with_raw_response
raw_response = await raw_client.chat.completions.create(...)
更多推荐


所有评论(0)