抱脸部署 Grok2API 教程

Reno 于 2025-03-24 发布

介绍

原项目 xLmiler/grok2api_python

由于 grok 官网更新了验证机制,添加了 x-statsig-id 请求头,原项目目前已失效,博主根据该项目更新了抱脸部署的教程

感谢 3806080717 提供了 x-statsig–id 接口服务

部署

点击这里 创建空间 选择 Docker 然后在Files添加或者修改下面提到的环境变量和文件

环境变量

变量名 示例值 描述
PROXY socks5://**** 可选,代理地址,支持 https 和 socks5
STATS_ID_URL https://**** 可选,x-statsig-id 接口地址,默认 https://rui.soundai.ee/x.php
IS_CUSTOM_SSO false 可选,开启单SSO模式,默认 false
SSO eyJ****,eyJ**** 关闭单SSO模式时必填,取自Grok官网Cookie的sso值
API_KEY sk-12345 关闭单SSO模式时必填,默认 sk-123456,开启单SSO模式时选填
PICGO_KEY chv_**** 绘图必选,Picgo 图床密钥,与 TUMY_KEY 二选一
TUMY_KEY   绘图必选,TUMY图床密钥,与 PICGO_KEY 二选一
CF_CLEARANCE cf_clearance=xxxxxx 可选,Cloudflare 免托管凭据, 取自Grok官网Cookie的cf_clearance值
IS_TEMP_CONVERSATION true 可选,开启临时对话,默认false
SHOW_THINKING true 可选,显示思考过程,默认false
ISSHOW_SEARCH_RESULTS true 可选,显示搜索内容,默认false
MANAGER_SWITCH false 可选,开启管理后台,默认false
ADMINPASSWORD password 可选,管理员密码,默认空值

配置文件

Dockerfile

FROM python:3.10-slim

RUN useradd -m -u 1000 -s /bin/bash grok

RUN pip install --no-cache-dir flask requests curl_cffi werkzeug loguru python-dotenv

WORKDIR /home/grok/app

COPY --chown=grok:grok . .

USER grok

EXPOSE 7860

CMD ["python", "main.py"

README.md

---
layout: mypost
title: Grok2API
emoji: 🦀
colorFrom: yellow
colorTo: red
sdk: docker
pinned: false
---

main.py

import os
import json
import uuid
import time
import base64
import sys
import inspect
import secrets
import re
from loguru import logger
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Any

import requests
from flask import Flask, request, Response, jsonify, stream_with_context, render_template, redirect, session
from curl_cffi import requests as curl_requests
from werkzeug.middleware.proxy_fix import ProxyFix

ROLE_ASSISTANT = "assistant"
ROLE_USER = "user"
MESSAGE_TAG_FINAL = "final"
IMAGE_TYPE_TEXT = "text"
IMAGE_TYPE_URL = "image_url"

class Logger:
    def __init__(self, level="INFO", colorize=True, format=None):
        logger.remove()
        if format is None:
            format = (
                "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
                "<level>{level: <8}</level> | "
                "<cyan>{extra[filename]}</cyan>:<cyan>{extra[function]}</cyan>:<cyan>{extra[lineno]}</cyan> | "
                "<level>{message}</level>"
            )
        logger.add(sys.stderr, level=level, format=format, colorize=colorize, backtrace=True, diagnose=True)
        self.logger = logger

    def _get_caller_info(self):
        frame = inspect.currentframe()
        try:
            caller_frame = frame.f_back.f_back
            return {
                'filename': os.path.basename(caller_frame.f_code.co_filename),
                'function': caller_frame.f_code.co_name,
                'lineno': caller_frame.f_lineno,
            }
        finally:
            del frame

    def info(self, message, source="API"):
        self.logger.bind(**self._get_caller_info()).info(f"[{source}] {message}")

    def error(self, message, source="API"):
        caller_info = self._get_caller_info()
        if isinstance(message, Exception):
            self.logger.bind(**caller_info).exception(f"[{source}] {str(message)}")
        else:
            self.logger.bind(**caller_info).error(f"[{source}] {message}")

    def warning(self, message, source="API"):
        self.logger.bind(**self._get_caller_info()).warning(f"[{source}] {message}")

    def debug(self, message, source="API"):
        self.logger.bind(**self._get_caller_info()).debug(f"[{source}] {message}")

logger = Logger(level="INFO")
DATA_DIR = Path("/home/grok/app")
DATA_DIR.mkdir(parents=True, exist_ok=True)

CONFIG = {
    "MODELS": {
        'grok-2': 'grok-latest', 'grok-2-imageGen': 'grok-latest', 'grok-2-search': 'grok-latest',
        "grok-3": "grok-3", "grok-3-search": "grok-3", "grok-3-imageGen": "grok-3",
        "grok-3-deepsearch": "grok-3", "grok-3-deepersearch": "grok-3", "grok-3-reasoning": "grok-3"
    },
    "API": {
        "IS_TEMP_CONVERSATION": os.environ.get("IS_TEMP_CONVERSATION", "true").lower() == "true",
        "IS_CUSTOM_SSO": os.environ.get("IS_CUSTOM_SSO", "false").lower() == "true",
        "BASE_URL": os.environ.get("GROK_BASE_URL", "https://grok.com"),
        "STATS_ID_URL": os.environ.get("STATS_ID_URL", "https://rui.soundai.ee/x.php"),
        "API_KEY": os.environ.get("API_KEY", "sk-123456"),
        "PICGO_KEY": os.environ.get("PICGO_KEY"),
        "TUMY_KEY": os.environ.get("TUMY_KEY"),
        "RETRY_TIME": 1000,
        "PROXY": os.environ.get("PROXY")
    },
    "ADMIN": {
        "MANAGER_SWITCH": os.environ.get("MANAGER_SWITCH"),
        "PASSWORD": os.environ.get("ADMINPASSWORD")
    },
    "SERVER": {
        "CF_CLEARANCE": os.environ.get("CF_CLEARANCE"),
        "PORT": int(os.environ.get("PORT", 7860))
    },
    "RETRY": {"MAX_ATTEMPTS": 2},
    "TOKEN_STATUS_FILE": str(DATA_DIR / "token_status.json"),
    "SHOW_THINKING": os.environ.get("SHOW_THINKING") == "true",
    "ISSHOW_SEARCH_RESULTS": os.environ.get("ISSHOW_SEARCH_RESULTS", "true").lower() == "true"
}

DEFAULT_HEADERS = {
    'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Encoding': 'gzip, deflate, br, zstd',
    'Content-Type': 'text/plain;charset=UTF-8', 'Connection': 'keep-alive', 'Origin': CONFIG["API"]["BASE_URL"],
    'Priority': 'u=1, i', 'Sec-Ch-Ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
    'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"macOS"', 'Sec-Fetch-Dest': 'empty',
    'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin',
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
    'Baggage': 'sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c'
}

@dataclass
class RequestContext:
    is_thinking: bool = False
    is_img_gen: bool = False
    is_img_gen_done: bool = False
    image_urls: List[str] = field(default_factory=list)


class AuthTokenManager:
    def __init__(self):
        self.token_model_map = {}
        self.expired_tokens = set()
        self.token_status_map = {}
        self.model_config = {model: {} for model in CONFIG["MODELS"]}
        self.token_reset_switch = False
        self.token_reset_timer = None
        self.load_token_status()

    def save_token_status(self):
        try:
            with open(CONFIG["TOKEN_STATUS_FILE"], 'w', encoding='utf-8') as f:
                save_data = {}
                for sso, models in self.token_status_map.items():
                    save_data[sso] = {}
                    for model, status in models.items():
                        save_data[sso][model] = {
                            "isValid": status.get("isValid"),
                            "invalidatedTime": status.get("invalidatedTime"),
                            "totalRequestCount": status.get("totalRequestCount", 0)
                        }
                json.dump(save_data, f, indent=2, ensure_ascii=False)
            logger.debug("Token status saved to file.", "TokenManager")
        except Exception as error:
            logger.error(f"Failed to save token status: {str(error)}", "TokenManager")

    def load_token_status(self):
        try:
            token_status_file = Path(CONFIG["TOKEN_STATUS_FILE"])
            if token_status_file.exists():
                with open(token_status_file, 'r', encoding='utf-8') as f:
                    self.token_status_map = json.load(f)
                logger.info("Loaded token status from file.", "TokenManager")
        except Exception as error:
            logger.error(f"Failed to load token status: {str(error)}", "TokenManager")

    def add_token(self, token,isinitialization=False):
        sso = token.split("sso=")[1].split(";")[0]
        normalized_model_pools = {self.normalize_model_name(m) for m in self.model_config.keys()}
        
        for model_pool in normalized_model_pools:
            if model_pool not in self.token_model_map:
                self.token_model_map[model_pool] = []
            if sso not in self.token_status_map:
                self.token_status_map[sso] = {}

            if not any(entry["token"] == token for entry in self.token_model_map[model_pool]):
                self.token_model_map[model_pool].append({"token": token, "AddedTime": int(time.time() * 1000)})
                if model_pool not in self.token_status_map[sso]:
                    self.token_status_map[sso][model_pool] = {
                        "isValid": True, "invalidatedTime": None, "totalRequestCount": 0
                    }
        if not isinitialization:
            self.save_token_status()

    def set_token(self, token):
        normalized_model_pools = {self.normalize_model_name(m) for m in self.model_config.keys()}
        self.token_model_map = {pool: [{"token": token, "AddedTime": int(time.time() * 1000)}] for pool in normalized_model_pools}
        sso = token.split("sso=")[1].split(";")[0]
        self.token_status_map[sso] = {pool: {
            "isValid": True, "invalidatedTime": None, "totalRequestCount": 0
        } for pool in normalized_model_pools}


    def delete_token(self, token):
        try:
            sso = token.split("sso=")[1].split(";")[0]
            for model_pool in list(self.token_model_map.keys()):
                self.token_model_map[model_pool] = [e for e in self.token_model_map[model_pool] if e["token"] != token]
            if sso in self.token_status_map:
                del self.token_status_map[sso]
            self.save_token_status()
            logger.info(f"Token successfully removed: {token}", "TokenManager")
            return True
        except Exception as error:
            logger.error(f"Token deletion failed: {str(error)}")
            return False

    def _query_rate_limit(self, cookie: str, model_id: str, statsig_id: str) -> Dict[str, Any]:
        url = f"{CONFIG['API']['BASE_URL']}/rest/rate-limits"
        
        if "reasoning" in model_id: requestKind = "REASONING"
        elif "deepersearch" in model_id: requestKind = "DEEPERSEARCH"
        elif "deepsearch" in model_id: requestKind = "DEEPSEARCH"
        else: requestKind = "DEFAULT"
        
        payloadModelName = CONFIG["MODELS"][model_id]
        
        full_cookie = f"{cookie};{CONFIG['SERVER']['CF_CLEARANCE']}" if CONFIG["SERVER"].get('CF_CLEARANCE') else cookie
        headers = {**DEFAULT_HEADERS, 'Content-Type': 'application/json', 'Referer': f"{CONFIG['API']['BASE_URL']}/", 'x-statsig-id': statsig_id, 'Cookie': full_cookie}
        payload = {"requestKind": requestKind, "modelName": payloadModelName}
        
        try:
            response = curl_requests.post(url, headers=headers, json=payload, impersonate="chrome133a", timeout=10, **Utils.get_proxy_options())
            response.raise_for_status()
            return response.json()
        except Exception as e:
            logger.error(f"Error querying rate limit for token ...{cookie[-10:]} ({model_id}): {e}", "TokenManager")
            raise

    def get_next_token_for_model(self, model_id: str, statsig_id: str, is_return: bool = False):
        normalized_model = self.normalize_model_name(model_id)
        if normalized_model not in self.token_model_map or not self.token_model_map[normalized_model]: return None
        if is_return: return self.token_model_map[normalized_model][0]["token"]

        for token_entry in list(self.token_model_map[normalized_model]):
            token = token_entry["token"]
            sso = token.split("sso=")[1].split(";")[0]
            try:
                rate_limit_data = self._query_rate_limit(token, model_id, statsig_id)
                remaining_queries = rate_limit_data.get("remainingQueries", 0)

                if remaining_queries > 0:
                    logger.info(f"Token ...{token[-10:]} has {remaining_queries} requests remaining for {model_id}. Using it.", "TokenManager")
                    if sso in self.token_status_map and normalized_model in self.token_status_map[sso]:
                        self.token_status_map[sso][normalized_model].update({"isValid": True, "totalRequestCount": self.token_status_map[sso][normalized_model].get("totalRequestCount", 0) + 1})
                    if not self.token_reset_switch: self.start_token_reset_process(); self.token_reset_switch = True
                    self.save_token_status()
                    return token
                else:
                    logger.warning(f"Token ...{token[-10:]} for {model_id} has 0 requests. Moving to expired list.", "TokenManager")
                    if sso in self.token_status_map and normalized_model in self.token_status_map[sso]:
                        self.token_status_map[sso][normalized_model].update({"isValid": False, "invalidatedTime": int(time.time() * 1000)})
                    
                    window_size_seconds = rate_limit_data.get("windowSizeSeconds", 3600)
                    self.move_to_expired(normalized_model, token, window_size_seconds)
                    self.save_token_status()
                    continue
            except Exception:
                logger.error(f"Failed to query rate limit for token ...{token[-10:]}. Assuming invalid, moving to expired list with default timeout.", "TokenManager")
                self.move_to_expired(normalized_model, token, 3600)
                continue
        
        logger.warning(f"No available tokens with remaining requests found for model {model_id}.", "TokenManager")
        return None

    def move_to_expired(self, model_id: str, token: str, window_size_seconds: int):
        model_tokens = self.token_model_map.get(model_id, [])
        token_index = next((i for i, entry in enumerate(model_tokens) if entry["token"] == token), -1)
        if token_index != -1:
            model_tokens.pop(token_index)
            self.expired_tokens.add((token, model_id, time.time(), window_size_seconds))
            if not self.token_reset_switch: self.start_token_reset_process(); self.token_reset_switch = True

    def normalize_model_name(self, model):
        if model.startswith('grok-') and 'deepsearch' not in model and 'reasoning' not in model:
            return '-'.join(model.split('-')[:2])
        return model

    def get_token_count_for_model(self, model_id):
        return len(self.token_model_map.get(self.normalize_model_name(model_id), []))

    def start_token_reset_process(self):
        def reset_expired_tokens_loop():
            while True:
                now = time.time()
                tokens_to_re_add = set()
                for token_info in self.expired_tokens:
                    token, model, expired_timestamp, window_size_seconds = token_info
                    if now - expired_timestamp >= window_size_seconds:
                        if not any(e["token"] == token for e in self.token_model_map.get(model, [])):
                            if model not in self.token_model_map: self.token_model_map[model] = []
                            self.token_model_map[model].append({"token": token, "AddedTime": int(now * 1000)})
                        sso = token.split("sso=")[1].split(";")[0]
                        if sso in self.token_status_map and model in self.token_status_map[sso]:
                            self.token_status_map[sso][model].update({"isValid": True, "invalidatedTime": None, "totalRequestCount": 0})
                        tokens_to_re_add.add(token_info)
                
                if tokens_to_re_add:
                    self.expired_tokens -= tokens_to_re_add
                    self.save_token_status()
                    logger.info(f"Re-activated {len(tokens_to_re_add)} token(s) after rate limit window.", "TokenManager")
                
                time.sleep(60)
        
        import threading
        timer_thread = threading.Thread(target=reset_expired_tokens_loop)
        timer_thread.daemon = True
        timer_thread.start()

    def get_all_tokens(self):
        return list({e["token"] for model_tokens in self.token_model_map.values() for e in model_tokens})

    def get_current_token(self, model_id):
        normalized_model = self.normalize_model_name(model_id)
        if normalized_model not in self.token_model_map or not self.token_model_map[normalized_model]: return None
        return self.token_model_map[normalized_model][0]["token"]

    def get_token_status_map(self): return self.token_status_map

class Utils:
    @staticmethod
    def get_statsig_id():
        try:
            response = curl_requests.get(CONFIG["API"]["STATS_ID_URL"], impersonate="chrome133a", timeout=10, **Utils.get_proxy_options())
            response.raise_for_status()
            statsig_id = response.json().get("x_statsig_id")
            if not statsig_id: raise ValueError("Invalid statsig ID received.")
            logger.debug(f"Successfully obtained x-statsig-id: {statsig_id}", "Utils")
            return statsig_id
        except Exception as e:
            raise ValueError(f"Could not retrieve x-statsig-id: {e}") from e

    @staticmethod
    def organize_search_results(search_results):
        if not search_results or 'results' not in search_results: return ''
        results = [f"\r\n<details><summary>资料[{i}]: {r.get('title', '未知标题')}</summary>\r\n{r.get('preview', '无预览内容')}\r\n\n[Link]({r.get('url', '#')})\r\n</details>" for i, r in enumerate(search_results['results'])]
        return '\n\n'.join(results)

    @staticmethod
    def get_proxy_options():
        proxy = CONFIG["API"]["PROXY"]
        if not proxy: return {}
        if proxy.startswith("socks5://"):
            if '@' in proxy:
                auth_part, host_part = proxy.split('@')
                username, password = auth_part.split('://')[1].split(':')
                return {"proxy": f"socks5://{host_part}", "proxy_auth": (username, password)}
            return {"proxy": proxy}
        return {"proxies": {"https": proxy, "http": proxy}}

class GrokApiClient:
    def __init__(self, model_id):
        if model_id not in CONFIG["MODELS"]:
            raise ValueError(f"Unsupported model: {model_id}")
        self.model_id = CONFIG["MODELS"][model_id]

    def get_image_type(self, base64_string):
        mime_type = 'image/jpeg'
        if 'data:image' in base64_string:
            match = re.search(r'data:([a-zA-Z0-9]+\/[a-zA-Z0-9-.+]+);base64,', base64_string)
            if match: mime_type = match.group(1)
        extension = mime_type.split('/')[1]
        return {"mimeType": mime_type, "fileName": f"image.{extension}"}

    def _upload_file(self, upload_data, headers, url_path):
        try:
            logger.info(f"Sending file upload request to {url_path}", "Server")
            proxy_options = Utils.get_proxy_options()
            response = curl_requests.post(
                f"{CONFIG['API']['BASE_URL']}{url_path}",
                headers=headers, json=upload_data,
                impersonate="chrome133a", **proxy_options
            )
            response.raise_for_status()
            result = response.json()
            logger.info(f"File upload successful: {result}", "Server")
            return result.get("fileMetadataId", "")
        except Exception as error:
            logger.error(f"File upload failed: {error}", "Server")
            raise

    def upload_base64_file(self, message, headers):
        message_base64 = base64.b64encode(message.encode('utf-8')).decode('utf-8')
        upload_data = {"fileName": "message.txt", "fileMimeType": "text/plain", "content": message_base64}
        return self._upload_file(upload_data, headers, "/rest/app-chat/upload-file")

    def upload_base64_image(self, base64_data, headers):
        image_buffer = base64_data.split(',')[1] if 'data:image' in base64_data else base64_data
        image_info = self.get_image_type(base64_data)
        upload_data = {
            "rpc": "uploadFile",
            "req": {"fileName": image_info["fileName"], "fileMimeType": image_info["mimeType"], "content": image_buffer}
        }
        return self._upload_file(upload_data, headers, "/api/rpc")

    def prepare_chat_request(self, request_data, headers):
        model = request_data["model"]
        if 'imageGen' in model and not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"] and request_data.get("stream", False):
            raise ValueError("Streaming for image generation models requires PICGO or TUMY key!")

        todo_messages = request_data["messages"]
        if model in ['grok-2-imageGen', 'grok-3-imageGen', 'grok-3-deepsearch']:
            if todo_messages[-1]["role"] != 'user':
                raise ValueError('The last message must be from the user for this model!')
            todo_messages = [todo_messages[-1]]

        file_attachments, messages, last_content = [], '', ''
        last_role, message_length, convert_to_file = None, 0, False

        def remove_think_tags(text):
            text = re.sub(r'<think>[\s\S]*?<\/think>', '', text, flags=re.DOTALL).strip()
            return re.sub(r'!\[image\]\(data:.*?base64,.*?\)', '[图片]', text)

        def process_content(content):
            if isinstance(content, list):
                return '\n'.join(
                    '[图片]' if item["type"] == IMAGE_TYPE_URL else remove_think_tags(item.get("text", ""))
                    for item in content
                )
            return remove_think_tags(str(content)) if content else ''

        for i, current in enumerate(todo_messages):
            role = ROLE_ASSISTANT if current["role"] == ROLE_ASSISTANT else ROLE_USER
            is_last_message = i == len(todo_messages) - 1

            if is_last_message and isinstance(current.get("content"), list):
                for item in current["content"]:
                    if item["type"] == IMAGE_TYPE_URL:
                        try:
                            file_id = self.upload_base64_image(item["image_url"]["url"], headers)
                            if file_id: file_attachments.append(file_id)
                        except Exception:
                           logger.warning("Failed to upload an image, skipping it.", "RequestPrep")

            text_content = process_content(current.get("content", ""))
            
            if message_length >= 40000 and not convert_to_file:
                convert_to_file = True
                try:
                    file_id = self.upload_base64_file(messages, headers)
                    if file_id: file_attachments.insert(0, file_id)
                    messages = ""
                except Exception:
                    logger.error("Failed to upload long conversation as file.", "RequestPrep")

            if text_content or (is_last_message and file_attachments):
                if role == last_role and text_content:
                    last_content += '\n' + text_content
                    messages = messages[:messages.rfind(f"{role.upper()}:")] + f"{role.upper()}: {last_content}\n"
                else:
                    messages += f"{role.upper()}: {text_content or '[图片]'}\n"
                    last_content = text_content
                    last_role = role
            message_length += len(text_content)
        
        if not messages.strip():
            if file_attachments:
                messages = 'Based on the attached file(s), please respond.'
            else:
                raise ValueError('Message content is empty!')
        
        deepsearchPreset = ''
        if model == 'grok-3-deepsearch': deepsearchPreset = 'default'
        elif model == 'grok-3-deepersearch': deepsearchPreset = 'deeper'
        search = 'search' in model

        return {
            "temporary": CONFIG["API"]["IS_TEMP_CONVERSATION"], "modelName": self.model_id,
            "message": messages.strip(), "fileAttachments": file_attachments[:4], "imageAttachments": [],
            "disableArtifact": False, "isFromGrokFiles": False, 
            "disableSearch": False, "enableImageGeneration": True, "returnImageBytes": False,
            "returnRawGrokInXaiRequest": False, "enableImageStreaming": False, "imageGenerationCount": 1,
            "forceConcise": False, "toolOverrides": {"imageGen": 'imageGen' in model, "webSearch": search,
            "xSearch": search, "xMediaSearch": search, "trendsSearch": search, "xPostAnalyze": search},
            "enableSideBySide": True, "sendFinalMetadata": True, "customPersonality": "",
            "deepsearchPreset": deepsearchPreset, "isReasoning": model == 'grok-3-reasoning',
            "disableTextFollowUps": True
        }


class MessageProcessor:
    @staticmethod
    def create_chat_response(message, model, is_stream=False):
        response_id = f"chatcmpl-{uuid.uuid4()}"
        created_time = int(time.time())
        base = {"id": response_id, "created": created_time, "model": model}
        if is_stream:
            return {**base, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": message}}]}
        return {**base, "object": "chat.completion", "choices": [{"index": 0, "message": {"role": ROLE_ASSISTANT, "content": message}, "finish_reason": "stop"}], "usage": None}

def process_model_response(response_data: Dict[str, Any], model: str, req_ctx: RequestContext) -> str:
    token = response_data.get("token")
    
    if 'search' in model and response_data.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]:
        return f"\r\n<think>{Utils.organize_search_results(response_data['webSearchResults'])}</think>\r\n"
    
    is_deepsearch = model in ['grok-3-deepsearch', 'grok-3-deepersearch']
    is_reasoning = model == 'grok-3-reasoning'

    if is_deepsearch:
        if response_data.get("messageStepId") and not CONFIG["SHOW_THINKING"]: return ""
        if response_data.get("messageStepId") and not req_ctx.is_thinking:
            req_ctx.is_thinking = True
            return f"<think>{token or ''}"
        if not response_data.get("messageStepId") and req_ctx.is_thinking and response_data.get("messageTag") == MESSAGE_TAG_FINAL:
            req_ctx.is_thinking = False
            return f"</think>{token or ''}"
        if (response_data.get("messageStepId") and req_ctx.is_thinking and response_data.get("messageTag") == ROLE_ASSISTANT) or response_data.get("messageTag") == MESSAGE_TAG_FINAL:
            return token
        if req_ctx.is_thinking and isinstance(token, dict) and token.get("action") == "webSearch":
            return token.get("action_input", {}).get("query", "")
        if req_ctx.is_thinking and response_data.get("webSearchResults"):
            return Utils.organize_search_results(response_data['webSearchResults'])
        return token if token is not None else ""

    if is_reasoning:
        if response_data.get("isThinking") and not CONFIG["SHOW_THINKING"]: return ""
        if response_data.get("isThinking") and not req_ctx.is_thinking:
            req_ctx.is_thinking = True
            return f"<think>{token or ''}"
        if not response_data.get("isThinking") and req_ctx.is_thinking:
            req_ctx.is_thinking = False
            return f"</think>{token or ''}"
        return token if token is not None else ""

    return token if token is not None else ""

def handle_image_responses(image_urls: List[str]) -> str:
    processed_images = []
    if not image_urls:
        return ""
    for image_url in image_urls:
        for attempt in range(CONFIG["RETRY"]["MAX_ATTEMPTS"]):
            try:
                cookie = token_manager.get_current_token('grok-2') or ''
                if CONFIG["SERVER"].get('CF_CLEARANCE'): cookie += f";{CONFIG['SERVER']['CF_CLEARANCE']}"
                
                response = curl_requests.get(f"https://assets.grok.com/{image_url}", headers={**DEFAULT_HEADERS, "Cookie": cookie}, impersonate="chrome133a", **Utils.get_proxy_options())
                response.raise_for_status()
                image_buffer = response.content
                
                if CONFIG["API"]["PICGO_KEY"]:
                    files, headers = {'source': ('image.jpg', image_buffer, 'image/jpeg')}, {"X-API-Key": CONFIG["API"]["PICGO_KEY"]}
                    upload_resp = requests.post("https://www.picgo.net/api/1/upload", files=files, headers=headers)
                    if upload_resp.ok: processed_images.append(f"![image]({upload_resp.json()['image']['url']})"); break
                
                if CONFIG["API"]["TUMY_KEY"]:
                    files, headers = {'file': ('image.jpg', image_buffer, 'image/jpeg')}, {"Accept": "application/json", 'Authorization': f"Bearer {CONFIG['API']['TUMY_KEY']}"}
                    upload_resp = requests.post("https://tu.my/api/v1/upload", files=files, headers=headers)
                    if upload_resp.ok: processed_images.append(f"![image]({upload_resp.json()['data']['links']['url']})"); break

                base64_image = base64.b64encode(image_buffer).decode('utf-8')
                content_type = response.headers.get('content-type', 'image/jpeg')
                processed_images.append(f"![image](data:{content_type};base64,{base64_image})")
                break
            except Exception as error:
                logger.error(f"Image ({image_url}) processing attempt {attempt+1} failed: {error}", "ImageHandler")
                if attempt + 1 == CONFIG["RETRY"]["MAX_ATTEMPTS"]:
                    processed_images.append(f"Failed to generate or upload image: {image_url}")
                time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * (attempt + 1))
    return "\n".join(processed_images)

def handle_non_stream_response(response, model, req_ctx: RequestContext):
    full_response_text = ""
    image_markdown = ""

    for chunk in response.iter_lines():
        if not chunk: continue
        try:
            line = chunk.decode("utf-8").strip()
            if not line: continue
            line_json = json.loads(line)

            if line_json.get("error"):
                logger.error(json.dumps(line_json, indent=2), "Server")
                return json.dumps({"error": "RateLimitError"})

            response_data = line_json.get("result", {}).get("response")
            if not response_data: continue

            if response_data.get("imageDimensions") or response_data.get("imageAttachmentInfo"):
                req_ctx.is_img_gen = True

            text_token = process_model_response(response_data, model, req_ctx)
            if text_token:
                full_response_text += text_token
            
            model_response = response_data.get("modelResponse")
            if (
                req_ctx.is_img_gen and 
                not req_ctx.is_img_gen_done and 
                model_response and 
                model_response.get("generatedImageUrls")
            ):
                logger.info(f"Final image data received. Processing {len(model_response['generatedImageUrls'])} images.", "Server")
                image_markdown = handle_image_responses(model_response["generatedImageUrls"])
                req_ctx.is_img_gen_done = True

        except Exception as e:
            logger.error(f"Error processing non-stream chunk '{line}': {e}", "Server")
    
    return full_response_text + image_markdown

def handle_stream_response(response, model, req_ctx: RequestContext):
    def generate():
        for chunk in response.iter_lines():
            if not chunk: continue
            try:
                line = chunk.decode("utf-8").strip()
                if not line: continue
                line_json = json.loads(line)

                if line_json.get("error"):
                    logger.error(json.dumps(line_json, indent=2), "Server")
                    yield f"data: {json.dumps({'error': 'RateLimitError'})}\n\n"
                    return

                response_data = line_json.get("result", {}).get("response")
                if not response_data: continue

                if response_data.get("imageDimensions") or response_data.get("imageAttachmentInfo"):
                    req_ctx.is_img_gen = True
                
                text_token = process_model_response(response_data, model, req_ctx)
                if text_token:
                    yield f"data: {json.dumps(MessageProcessor.create_chat_response(text_token, model, True))}\n\n"

                model_response = response_data.get("modelResponse")
                if (
                    req_ctx.is_img_gen and 
                    not req_ctx.is_img_gen_done and 
                    model_response and 
                    model_response.get("generatedImageUrls")
                ):
                    logger.info(f"Final image data received in stream. Processing {len(model_response['generatedImageUrls'])} images.", "Server")
                    image_markdown = handle_image_responses(model_response["generatedImageUrls"])
                    if image_markdown:
                        yield f"data: {json.dumps(MessageProcessor.create_chat_response(image_markdown, model, True))}\n\n"
                    req_ctx.is_img_gen_done = True

            except Exception as e:
                logger.error(f"Error processing stream chunk '{line}': {e}", "Server")
        
        yield "data: [DONE]\n\n"
    return generate()

def initialization():
    sso_array = os.environ.get("SSO", "").split(',')
    logger.info("Loading tokens...", "Server")
    token_manager.load_token_status()
    for sso in sso_array:
        if sso:
            token_manager.add_token(f"sso-rw={sso};sso={sso}", isinitialization=True)
    token_manager.save_token_status()
    logger.info(f"Loaded {len(token_manager.get_all_tokens())} unique tokens.", "Server")
    if CONFIG["API"]["PROXY"]:
        logger.info(f"Proxy is set: {CONFIG['API']['PROXY']}", "Server")
    logger.info("Initialization complete.", "Server")

app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)
app.secret_key = os.environ.get('FLASK_SECRET_KEY', secrets.token_hex(16))
app.json.sort_keys = False

@app.route('/manager/login', methods=['GET', 'POST'])
def manager_login():
    if CONFIG["ADMIN"]["MANAGER_SWITCH"]:
        if request.method == 'POST':
            password = request.form.get('password')
            if password == CONFIG["ADMIN"]["PASSWORD"]:
                session['is_logged_in'] = True
                return redirect('/manager')
            return render_template('login.html', error=True)
        return render_template('login.html', error=False)
    else:
        return redirect('/')

def check_auth():
    return session.get('is_logged_in', False)

@app.route('/manager')
def manager():
    if not check_auth():
        return redirect('/manager/login')
    return render_template('manager.html')

@app.route('/manager/api/get')
def get_manager_tokens():
    if not check_auth():
        return jsonify({"error": "Unauthorized"}), 401
    return jsonify(token_manager.get_token_status_map())

@app.route('/manager/api/add', methods=['POST'])
def add_manager_token():
    if not check_auth():
        return jsonify({"error": "Unauthorized"}), 401
    try:
        sso = request.json.get('sso')
        if not sso:
            return jsonify({"error": "SSO token is required"}), 400
        token_manager.add_token(f"sso-rw={sso};sso={sso}")
        return jsonify({"success": True})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/manager/api/delete', methods=['POST'])
def delete_manager_token():
    if not check_auth():
        return jsonify({"error": "Unauthorized"}), 401
    try:
        sso = request.json.get('sso')
        if not sso:
            return jsonify({"error": "SSO token is required"}), 400
        token_manager.delete_token(f"sso-rw={sso};sso={sso}")
        return jsonify({"success": True})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/manager/api/cf_clearance', methods=['POST'])
def setCf_Manager_clearance():
    if not check_auth():
        return jsonify({"error": "Unauthorized"}), 401
    try:
        cf_clearance = request.json.get('cf_clearance')
        if not cf_clearance:
            return jsonify({"error": "cf_clearance is required"}), 400
        CONFIG["SERVER"]['CF_CLEARANCE'] = cf_clearance
        return jsonify({"success": True})
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/v1/models', methods=['GET'])
def get_models():
    return jsonify({"object": "list", "data": [{"id": model, "object": "model", "created": int(time.time()), "owned_by": "grok"} for model in CONFIG["MODELS"].keys()]})

def _create_request_headers(cookie, statsig_id):
    headers = {**DEFAULT_HEADERS, "x-statsig-id": statsig_id}
    full_cookie = cookie
    if CONFIG["SERVER"].get('CF_CLEARANCE'):
        full_cookie = f"{cookie};{CONFIG['SERVER']['CF_CLEARANCE']}"
    if full_cookie:
        headers["Cookie"] = full_cookie
    return headers

@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
    response_status_code = 500
    try:
        auth_token = request.headers.get('Authorization', '').replace('Bearer ', '')
        if not auth_token:
            return jsonify({"error": 'API key is missing'}), 401

        if CONFIG["API"]["IS_CUSTOM_SSO"]:
            token_manager.set_token(f"sso={auth_token};sso-rw={auth_token}")
        elif auth_token != CONFIG["API"]["API_KEY"]:
            return jsonify({"error": 'Unauthorized'}), 401

        data = request.json
        model = data.get("model")
        stream = data.get("stream", False)

        statsig_id = Utils.get_statsig_id()
        req_ctx = RequestContext()
        grok_client = GrokApiClient(model)

        for attempt in range(CONFIG["RETRY"]["MAX_ATTEMPTS"]):
            signature_cookie = token_manager.get_next_token_for_model(model, statsig_id)
            if not signature_cookie:
                raise ValueError('No available tokens with remaining requests for this model.')

            logger.info(f"Attempt {attempt + 1}: Using token ending with ...{signature_cookie[-10:]}", "Server")
            request_headers = _create_request_headers(signature_cookie, statsig_id)

            try:
                request_payload = grok_client.prepare_chat_request(data, request_headers)
                logger.debug(f"Request Payload: {json.dumps(request_payload, indent=2)}", "Server")

                proxy_options = Utils.get_proxy_options()
                response = curl_requests.post(
                    f"{CONFIG['API']['BASE_URL']}/rest/app-chat/conversations/new",
                    headers=request_headers, data=json.dumps(request_payload),
                    impersonate="chrome133a", stream=True, **proxy_options
                )

                if response.status_code == 200:
                    response_status_code = 200
                    logger.info(f"Request successful. Active tokens for {model}: {token_manager.get_token_count_for_model(model)}", "Server")
                    if stream:
                        return Response(stream_with_context(handle_stream_response(response, model, req_ctx)), content_type='text/event-stream')
                    else:
                        content = handle_non_stream_response(response, model, req_ctx)
                        if "RateLimitError" in content:
                           raise ValueError("Rate limit error detected in non-stream response.")
                        return jsonify(MessageProcessor.create_chat_response(content, model))

                if response.status_code in [403, 429]:
                    error_msg = f"Token for {model} rate limited or IP blocked (status: {response.status_code})."
                    logger.warning(f"{error_msg} Moving to expired list and trying next token...", "Server")
                    if token_manager.get_token_count_for_model(model) == 0:
                        raise ValueError(f"{error_msg} All tokens for this model exhausted.")
                    continue
                else:
                    logger.error(f"Token error! Status: {response.status_code}, Response: {response.text}", "Server")
                    continue

            except Exception as e:
                logger.error(f"Request processing exception: {e}", "Server")
                if CONFIG["API"]["IS_CUSTOM_SSO"]: raise
                continue

        raise ValueError(f'All {CONFIG["RETRY"]["MAX_ATTEMPTS"]} retry attempts failed for model {model}. Please try again later.')

    except Exception as error:
        logger.error(str(error), "ChatAPI")
        return jsonify({"error": {"message": str(error), "type": "server_error"}}), response_status_code

@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def catch_all(path):
    return 'API is running normally', 200

if __name__ == '__main__':
    token_manager = AuthTokenManager()
    initialization()
    app.run(host='0.0.0.0', port=CONFIG["SERVER"]["PORT"], debug=False)

参考链接