解析流式API输出的正确方法
你的代码已经基本实现了流式API的接收,但在解析流式输出时可能需要一些改进。以下是几个关键点和改进建议:
流式API输出的常见格式
流式API通常有以下几种输出格式:
- 纯文本逐行输出
 
- JSON格式逐行输出
 
- Server-Sent Events (SSE) 格式
 
改进后的代码
import json
import hashlib
from datetime import datetime
import requests
def get_token(prompt: str) -> str:
    current_date = datetime.now().strftime("%Y-%m-%d")
    md5_date = hashlib.md5(current_date.encode()).hexdigest()[:6]
    final_prompt = prompt + md5_date
    token = hashlib.md5(final_prompt.encode()).hexdigest()
    return token
def send_request(prompt: str):
    token = get_token(prompt)
    
    payload = json.dumps({
        "prompt": prompt,
        "token": token,
        "stream": True  
    })
    
    headers = {
        'ca': '',  
        'Content-Type': 'application/json'
    }
    
    url = "https://ai.coludai.cn/api/chat"
    try:
        with requests.post(url, data=payload, headers=headers, stream=True) as response:
            if response.status_code == 200:
                buffer = ""
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        # 解码数据并添加到缓冲区
                        buffer += chunk.decode("utf-8")
                        
                        # 处理可能的多行数据
                        while "\n" in buffer:
                            line, buffer = buffer.split("\n", 1)
                            line = line.strip()
                            if line:
                                try:
                                    # 尝试解析为JSON
                                    data = json.loads(line)
                                    # 假设响应中有"text"字段
                                    print(data.get("text", ""), end="", flush=True)
                                except json.JSONDecodeError:
                                    # 如果不是JSON,直接输出
                                    print(line, end="", flush=True)
                # 输出缓冲区剩余内容
                if buffer:
                    print(buffer, end="", flush=True)
                print()  # 最后换行
            else:
                print(f"请求失败,状态码:{response.status_code}")
    except Exception as e:
        print(f"请求发生错误: {str(e)}")
if __name__ == "__main__":
    while True:
        prompt = input("请输入你的问题:")
        if prompt.lower() in ("exit", "quit"):
            break
        send_request(prompt)
关键改进点
- 使用
iter_content代替iter_lines:更可靠地处理流数据 
- 添加缓冲区处理:正确处理不完整的行
 
- JSON解析尝试:尝试解析每行数据为JSON,提取有用信息
 
- 错误处理:添加了更完善的错误处理
 
- 输出优化:使用
end=""和flush=True实现流畅输出 
常见问题排查
如果输出仍然不正常,可以:
- 检查API文档确认响应格式
 
- 添加调试输出查看原始数据:
   print(f"Raw line: {line}")  # 在解析前添加
 
- 确认是否需要处理特殊前缀(如SSE格式的"data: "前缀)
 
针对不同格式的处理
如果是Server-Sent Events (SSE)格式,可以这样处理:
if line.startswith("data: "):
    line = line[6:].strip()
如果是纯JSON流,确保正确处理每个JSON对象。
希望这些改进能帮助你正确解析流式输出!