使用OpenAI API 做 Book Summary(二):编写file_process.py和prompt.py


file_process.py

源代码

import os
import tiktoken
from prompt import chunk_prompt_messages
from typing import List, Tuple


# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
def num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613"):
    """Return the number of tokens used by a list of messages."""
    try:
        encoding = tiktoken.encoding_for_model(model)
    except KeyError:
        print("Warning: model not found. Using cl100k_base encoding.")
        encoding = tiktoken.get_encoding("cl100k_base")
    if model in {
        "gpt-3.5-turbo-0613",
        "gpt-3.5-turbo-16k-0613",
        "gpt-4-0314",
        "gpt-4-32k-0314",
        "gpt-4-0613",
        "gpt-4-32k-0613",
    }:
        tokens_per_message = 3
        tokens_per_name = 1
    elif model == "gpt-3.5-turbo-0301":
        tokens_per_message = 4  # every message follows <|start|>{role/name}\n{content}<|end|>\n
        tokens_per_name = -1  # if there's a name, the role is omitted
    elif "gpt-3.5-turbo" in model:
        print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
        return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613")
    elif "gpt-4" in model:
        print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
        return num_tokens_from_messages(messages, model="gpt-4-0613")
    else:
        raise NotImplementedError(
            f"""num_tokens_from_messages() is not implemented for model {model}. 
            See https://github.com/openai/openai-python/blob/main/chatml.md 
            for information on how messages are converted to tokens."""
        )
    num_tokens = 0
    for message in messages:
        num_tokens += tokens_per_message
        for key, value in message.items():
            num_tokens += len(encoding.encode(value))
            if key == "name":
                num_tokens += tokens_per_name
    num_tokens += 3  # every reply is primed with <|start|>assistant<|message|>
    return num_tokens


class FileProcessor:
    def __init__(self,
                 file_path: str,
                 chunk_summary_size: int,
                 summary_size: int,
                 division_point: str = "\n\n\n"):
        self.file_path = file_path
        self.chunk_summary_size = chunk_summary_size
        self.summary_size = summary_size
        self.division_point = division_point
        self.model = "gpt-3.5-turbo-1106"
        self.text = self._get_text()
        self.chunk_input_tokens = self._compute_input_tokens()
        self.cache_file = self._get_cache_name()
        self.chunks = self._split_text_into_chunks()

    def _get_text(self):
        with open(self.file_path, "rb") as f:
            txt = f.read()
            return txt.decode("gbk", "ignore")

    def _compute_input_tokens(self):
        base_chunk_prompt_tokens = num_tokens_from_messages(
            chunk_prompt_messages("", self.chunk_summary_size),
            model="gpt-3.5-turbo-0613",
        )
        chunk_input_tokens = 16385 - (base_chunk_prompt_tokens + self.chunk_summary_size)
        return chunk_input_tokens

    def _get_cache_name(self):
        file_name = os.path.basename(self.file_path)
        cache_file = f"cache/{file_name.split('.')[0]}.json"
        return cache_file

    def _take_tokens(self, text: str) -> Tuple[str, str]:
        """
        If the division point does not appear in the text, then splitting a word is acceptable
        for this implementation.
        @return: A tuple containing the first part of the text
        (a best-effort chunk of fewer than self.chunk_input_tokens) and the remainder of the text.
        """

        # Our initial token count is the number of tokens used by our base prompt, encoded as messages.
        enc = tiktoken.encoding_for_model(self.model)
        sections = text.split(self.division_point)
        non_empty_sections = [section for section in sections if section.strip() != ""]
        current_token_count = 0
        for i, section in enumerate(non_empty_sections):
            if current_token_count + len(enc.encode(section)) >= self.chunk_input_tokens:
                if i == 0:
                    # If i == 0, then we're in the special case where there exists no division-point-separated sections.
                    # Thus, we return the first `max_token_quantity` tokens as a chunk,
                    # even if it ends on an awkward split.
                    max_token_chunk = enc.decode(enc.encode(text)[:self.chunk_input_tokens])
                    remainder = text[len(max_token_chunk):]
                    return max_token_chunk, remainder
                else:
                    # Otherwise, return the accumulated text as a chunk.
                    emit = self.division_point.join(sections[: i - 1])
                    remainder = self.division_point.join(sections[i - 1:])
                    return emit, remainder
            else:
                current_token_count += len(enc.encode(section))
                current_token_count += len(enc.encode(self.division_point))

        return text, ""

    def _split_text_into_chunks(self) -> List[str]:
        # Divide the text into sections of at most `max_token_quantity` tokens. Strive to split along
        # division_points[0], but if that can't be done, then fall back to a lower precedence division point.
        text = self.text
        chunks = []
        while text:
            section, text = self._take_tokens(text)
            chunks.append(section)
        return chunks

功能讲解

获取文本

_get_text()函数用来获取文件的文本。

def _get_text(self):
    with open(self.file_path, "rb") as f:
        txt = f.read()
        return txt.decode("gbk", "ignore")

本项目只是简单的获取.txt文件的文本,您也可以扩展这部分的内容,读取其他格式的文本。

计算输入token

_compute_input_tokens()函数用来计算可以输入的文本chunk的token大小。

def _compute_input_tokens(self):
    base_chunk_prompt_tokens = num_tokens_from_messages(
        chunk_prompt_messages("", self.chunk_summary_size),
        model="gpt-3.5-turbo-0613",
    )
    chunk_input_tokens = 16385 - (base_chunk_prompt_tokens + self.chunk_summary_size)
    return chunk_input_tokens

其中,base_chunk_prompt_tokens计算的是不带任何文本内容的prompt的token大小,这里又调用了两个函数: num_tokens_from_messages()和chunk_prompt_messages()。

  • num_tokens_from_messages(),用以计算messages的token数量,在openai的cookbook里面定义的,我这里直接copy过来使用,需要导入tiktoken模块。如果想了解具体的原理,请参考How to count tokens with tiktoken

  • chunk_prompt_messages(),在prompt.py中定义,返回一个messages,用来对chunk进行summary,在后续会详细讲到。

  • 模型这里我直接使用了硬编码,采用最新的"gpt-3.5-turbo-0613",价格便宜量又足,这里您也可以写成一个可输入的变量。

然后,使用"gpt-3.5-turbo-0613"的token限度16385,减去base_chunk_prompt_tokens和chunk_summary_size,就得到了可输入的chunk的最大token数量。

获取cache文件名

_get_cache_name()函数获得对应的cache文件名,用于后续的cache检查。

def _get_cache_name(self):
    file_name = os.path.basename(self.file_path)
    cache_file = f"cache/{file_name.split('.')[0]}.json"
    return cache_file
将文本分成chunks

这个功能分成两步进行:

  • _take_tokens(): 返回二元组,第一个符合条件的chunk和剩余部分的文本。这里有个技巧,我们不能直接用chunk_input_tokens将文本分段,因为会在句子中间截断,造成意思的不连贯,而是应该尽量截断整段或者至少是整句。这时候,division_point这个变量就排上用场了,表示用于截断的分隔符,默认是"\n\n\n"。
def _take_tokens(self, text: str) -> Tuple[str, str]:
    """
    If the division point does not appear in the text, then splitting a word is acceptable
    for this implementation.
    @return: A tuple containing the first part of the text
    (a best-effort chunk of fewer than self.chunk_input_tokens) and the remainder of the text.
    """

    # Our initial token count is the number of tokens used by our base prompt, encoded as messages.
    enc = tiktoken.encoding_for_model(self.model)
    sections = text.split(self.division_point)
    non_empty_sections = [section for section in sections if section.strip() != ""]
    current_token_count = 0
    for i, section in enumerate(non_empty_sections):
        if current_token_count + len(enc.encode(section)) >= self.chunk_input_tokens:
            if i == 0:
                # If i == 0, then we're in the special case where there exists no division-point-separated sections.
                # Thus, we return the first `max_token_quantity` tokens as a chunk,
                # even if it ends on an awkward split.
                max_token_chunk = enc.decode(enc.encode(text)[:self.chunk_input_tokens])
                remainder = text[len(max_token_chunk):]
                return max_token_chunk, remainder
            else:
                # Otherwise, return the accumulated text as a chunk.
                emit = self.division_point.join(sections[: i - 1])
                remainder = self.division_point.join(sections[i - 1:])
                return emit, remainder
        else:
            current_token_count += len(enc.encode(section))
            current_token_count += len(enc.encode(self.division_point))

    return text, ""
  • _split_text_into_chunks(),通过一个循环,用递归的方式调用 _take_tokens(),从而得到全部文本的chunks。到这里,文件的处理工作就完成了。
def _split_text_into_chunks(self) -> List[str]:
    # Divide the text into sections of at most `max_token_quantity` tokens. Strive to split along
    # division_points[0], but if that can't be done, then fall back to a lower precedence division point.
    text = self.text
    chunks = []
    while text:
        section, text = self._take_tokens(text)
        chunks.append(section)
    return chunks

prompt.py

源代码

def chunk_prompt_messages(text: str, chunk_summary_size: int) -> list:
    # Craft the list of messages that will be sent to the model to instruct summarization.
    return [
        {
            "role": "system",
            "content": f"""
The user is asking you to summarize a book. 
Because the book it too long you are being asked to summarize one chunk at a time. 
In your summary, make no mention of the "chunks" or "passages" used in dividing the text for summarization.
Strive to make your summary as detailed as possible while remaining under a {chunk_summary_size} token limit.
Please give the result in the original language of the text.
""".strip(),
        },
        {"role": "user", "content": f"Summarize the following: {text}"},
    ]


def synthesis_prompt_messages(summaries_joined: str, summary_size: int) -> list:
    messages = [
        {
            "role": "system",
            "content": f"""
    A less powerful GPT model generated summaries of different chunks of a book.

    Because of the way that the summaries are generated, they may not be perfect. Please review them
    and synthesize them into a single more detailed summary that you think is best.
    
    Strive to make your summary under a {summary_size} token limit.
    
    Please give the result in the original language of the text,
    and then if the original language is not English, please give an English translation of the summary,
    if the original language is English, please give a Chinese translation of the summary.""".strip(),
        },
        {"role": "user", "content": f"The summaries are as follows: {summaries_joined}"},
    ]
    return messages

功能讲解

这里定义了两个函数,功能都非常简单:

  • chunk_prompt_messages(): 定义了对chunk进行summary的prompt;
  • synthesis_prompt_messages(): 定义了对每个chunk summary再进行一轮整合summary的prompt。

这里需要说明一下,在openai.ChatCompletion中,prompt被定义成messages,是一个由字典组成的列表,每个字典又包括"role"和"content"两个字段。具体的可以参考openai的接口文档:chat reference

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐