介绍
由于 grok 官网更新了验证机制,添加了 x-statsig-id 请求头,原项目目前已失效,博主根据该项目更新了抱脸部署的教程
感谢 3806080717 提供了 x-statsig–id 接口服务
部署
点击这里 创建空间 选择 Docker 然后在Files添加或者修改下面提到的环境变量和文件
环境变量
变量名 | 示例值 | 描述 |
---|---|---|
PROXY | socks5://**** | 可选,代理地址,支持 https 和 socks5 |
STATS_ID_URL | https://**** | 可选,x-statsig-id 接口地址,默认 https://rui.soundai.ee/x.php |
IS_CUSTOM_SSO | false | 可选,开启单SSO模式,默认 false |
SSO | eyJ****,eyJ**** | 关闭单SSO模式时必填,取自Grok官网Cookie的sso值 |
API_KEY | sk-12345 | 关闭单SSO模式时必填,默认 sk-123456,开启单SSO模式时选填 |
PICGO_KEY | chv_**** | 绘图必选,Picgo 图床密钥,与 TUMY_KEY 二选一 |
TUMY_KEY | 绘图必选,TUMY图床密钥,与 PICGO_KEY 二选一 | |
CF_CLEARANCE | cf_clearance=xxxxxx |
可选,Cloudflare 免托管凭据, 取自Grok官网Cookie的cf_clearance值 |
IS_TEMP_CONVERSATION | true | 可选,开启临时对话,默认false |
SHOW_THINKING | true | 可选,显示思考过程,默认false |
ISSHOW_SEARCH_RESULTS | true | 可选,显示搜索内容,默认false |
MANAGER_SWITCH | false | 可选,开启管理后台,默认false |
ADMINPASSWORD | password | 可选,管理员密码,默认空值 |
配置文件
Dockerfile
FROM python:3.10-slim
RUN useradd -m -u 1000 -s /bin/bash grok
RUN pip install --no-cache-dir flask requests curl_cffi werkzeug loguru python-dotenv
WORKDIR /home/grok/app
COPY --chown=grok:grok . .
USER grok
EXPOSE 7860
CMD ["python", "main.py"
README.md
---
layout: mypost
title: Grok2API
emoji: 🦀
colorFrom: yellow
colorTo: red
sdk: docker
pinned: false
---
main.py
import os
import json
import uuid
import time
import base64
import sys
import inspect
import secrets
import re
from loguru import logger
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Dict, Any
import requests
from flask import Flask, request, Response, jsonify, stream_with_context, render_template, redirect, session
from curl_cffi import requests as curl_requests
from werkzeug.middleware.proxy_fix import ProxyFix
ROLE_ASSISTANT = "assistant"
ROLE_USER = "user"
MESSAGE_TAG_FINAL = "final"
IMAGE_TYPE_TEXT = "text"
IMAGE_TYPE_URL = "image_url"
class Logger:
def __init__(self, level="INFO", colorize=True, format=None):
logger.remove()
if format is None:
format = (
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{extra[filename]}</cyan>:<cyan>{extra[function]}</cyan>:<cyan>{extra[lineno]}</cyan> | "
"<level>{message}</level>"
)
logger.add(sys.stderr, level=level, format=format, colorize=colorize, backtrace=True, diagnose=True)
self.logger = logger
def _get_caller_info(self):
frame = inspect.currentframe()
try:
caller_frame = frame.f_back.f_back
return {
'filename': os.path.basename(caller_frame.f_code.co_filename),
'function': caller_frame.f_code.co_name,
'lineno': caller_frame.f_lineno,
}
finally:
del frame
def info(self, message, source="API"):
self.logger.bind(**self._get_caller_info()).info(f"[{source}] {message}")
def error(self, message, source="API"):
caller_info = self._get_caller_info()
if isinstance(message, Exception):
self.logger.bind(**caller_info).exception(f"[{source}] {str(message)}")
else:
self.logger.bind(**caller_info).error(f"[{source}] {message}")
def warning(self, message, source="API"):
self.logger.bind(**self._get_caller_info()).warning(f"[{source}] {message}")
def debug(self, message, source="API"):
self.logger.bind(**self._get_caller_info()).debug(f"[{source}] {message}")
logger = Logger(level="INFO")
DATA_DIR = Path("/home/grok/app")
DATA_DIR.mkdir(parents=True, exist_ok=True)
CONFIG = {
"MODELS": {
'grok-2': 'grok-latest', 'grok-2-imageGen': 'grok-latest', 'grok-2-search': 'grok-latest',
"grok-3": "grok-3", "grok-3-search": "grok-3", "grok-3-imageGen": "grok-3",
"grok-3-deepsearch": "grok-3", "grok-3-deepersearch": "grok-3", "grok-3-reasoning": "grok-3"
},
"API": {
"IS_TEMP_CONVERSATION": os.environ.get("IS_TEMP_CONVERSATION", "true").lower() == "true",
"IS_CUSTOM_SSO": os.environ.get("IS_CUSTOM_SSO", "false").lower() == "true",
"BASE_URL": os.environ.get("GROK_BASE_URL", "https://grok.com"),
"STATS_ID_URL": os.environ.get("STATS_ID_URL", "https://rui.soundai.ee/x.php"),
"API_KEY": os.environ.get("API_KEY", "sk-123456"),
"PICGO_KEY": os.environ.get("PICGO_KEY"),
"TUMY_KEY": os.environ.get("TUMY_KEY"),
"RETRY_TIME": 1000,
"PROXY": os.environ.get("PROXY")
},
"ADMIN": {
"MANAGER_SWITCH": os.environ.get("MANAGER_SWITCH"),
"PASSWORD": os.environ.get("ADMINPASSWORD")
},
"SERVER": {
"CF_CLEARANCE": os.environ.get("CF_CLEARANCE"),
"PORT": int(os.environ.get("PORT", 7860))
},
"RETRY": {"MAX_ATTEMPTS": 2},
"TOKEN_STATUS_FILE": str(DATA_DIR / "token_status.json"),
"SHOW_THINKING": os.environ.get("SHOW_THINKING") == "true",
"ISSHOW_SEARCH_RESULTS": os.environ.get("ISSHOW_SEARCH_RESULTS", "true").lower() == "true"
}
DEFAULT_HEADERS = {
'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Encoding': 'gzip, deflate, br, zstd',
'Content-Type': 'text/plain;charset=UTF-8', 'Connection': 'keep-alive', 'Origin': CONFIG["API"]["BASE_URL"],
'Priority': 'u=1, i', 'Sec-Ch-Ua': '"Not(A:Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"',
'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Platform': '"macOS"', 'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
'Baggage': 'sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c'
}
@dataclass
class RequestContext:
is_thinking: bool = False
is_img_gen: bool = False
is_img_gen_done: bool = False
image_urls: List[str] = field(default_factory=list)
class AuthTokenManager:
def __init__(self):
self.token_model_map = {}
self.expired_tokens = set()
self.token_status_map = {}
self.model_config = {model: {} for model in CONFIG["MODELS"]}
self.token_reset_switch = False
self.token_reset_timer = None
self.load_token_status()
def save_token_status(self):
try:
with open(CONFIG["TOKEN_STATUS_FILE"], 'w', encoding='utf-8') as f:
save_data = {}
for sso, models in self.token_status_map.items():
save_data[sso] = {}
for model, status in models.items():
save_data[sso][model] = {
"isValid": status.get("isValid"),
"invalidatedTime": status.get("invalidatedTime"),
"totalRequestCount": status.get("totalRequestCount", 0)
}
json.dump(save_data, f, indent=2, ensure_ascii=False)
logger.debug("Token status saved to file.", "TokenManager")
except Exception as error:
logger.error(f"Failed to save token status: {str(error)}", "TokenManager")
def load_token_status(self):
try:
token_status_file = Path(CONFIG["TOKEN_STATUS_FILE"])
if token_status_file.exists():
with open(token_status_file, 'r', encoding='utf-8') as f:
self.token_status_map = json.load(f)
logger.info("Loaded token status from file.", "TokenManager")
except Exception as error:
logger.error(f"Failed to load token status: {str(error)}", "TokenManager")
def add_token(self, token,isinitialization=False):
sso = token.split("sso=")[1].split(";")[0]
normalized_model_pools = {self.normalize_model_name(m) for m in self.model_config.keys()}
for model_pool in normalized_model_pools:
if model_pool not in self.token_model_map:
self.token_model_map[model_pool] = []
if sso not in self.token_status_map:
self.token_status_map[sso] = {}
if not any(entry["token"] == token for entry in self.token_model_map[model_pool]):
self.token_model_map[model_pool].append({"token": token, "AddedTime": int(time.time() * 1000)})
if model_pool not in self.token_status_map[sso]:
self.token_status_map[sso][model_pool] = {
"isValid": True, "invalidatedTime": None, "totalRequestCount": 0
}
if not isinitialization:
self.save_token_status()
def set_token(self, token):
normalized_model_pools = {self.normalize_model_name(m) for m in self.model_config.keys()}
self.token_model_map = {pool: [{"token": token, "AddedTime": int(time.time() * 1000)}] for pool in normalized_model_pools}
sso = token.split("sso=")[1].split(";")[0]
self.token_status_map[sso] = {pool: {
"isValid": True, "invalidatedTime": None, "totalRequestCount": 0
} for pool in normalized_model_pools}
def delete_token(self, token):
try:
sso = token.split("sso=")[1].split(";")[0]
for model_pool in list(self.token_model_map.keys()):
self.token_model_map[model_pool] = [e for e in self.token_model_map[model_pool] if e["token"] != token]
if sso in self.token_status_map:
del self.token_status_map[sso]
self.save_token_status()
logger.info(f"Token successfully removed: {token}", "TokenManager")
return True
except Exception as error:
logger.error(f"Token deletion failed: {str(error)}")
return False
def _query_rate_limit(self, cookie: str, model_id: str, statsig_id: str) -> Dict[str, Any]:
url = f"{CONFIG['API']['BASE_URL']}/rest/rate-limits"
if "reasoning" in model_id: requestKind = "REASONING"
elif "deepersearch" in model_id: requestKind = "DEEPERSEARCH"
elif "deepsearch" in model_id: requestKind = "DEEPSEARCH"
else: requestKind = "DEFAULT"
payloadModelName = CONFIG["MODELS"][model_id]
full_cookie = f"{cookie};{CONFIG['SERVER']['CF_CLEARANCE']}" if CONFIG["SERVER"].get('CF_CLEARANCE') else cookie
headers = {**DEFAULT_HEADERS, 'Content-Type': 'application/json', 'Referer': f"{CONFIG['API']['BASE_URL']}/", 'x-statsig-id': statsig_id, 'Cookie': full_cookie}
payload = {"requestKind": requestKind, "modelName": payloadModelName}
try:
response = curl_requests.post(url, headers=headers, json=payload, impersonate="chrome133a", timeout=10, **Utils.get_proxy_options())
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error querying rate limit for token ...{cookie[-10:]} ({model_id}): {e}", "TokenManager")
raise
def get_next_token_for_model(self, model_id: str, statsig_id: str, is_return: bool = False):
normalized_model = self.normalize_model_name(model_id)
if normalized_model not in self.token_model_map or not self.token_model_map[normalized_model]: return None
if is_return: return self.token_model_map[normalized_model][0]["token"]
for token_entry in list(self.token_model_map[normalized_model]):
token = token_entry["token"]
sso = token.split("sso=")[1].split(";")[0]
try:
rate_limit_data = self._query_rate_limit(token, model_id, statsig_id)
remaining_queries = rate_limit_data.get("remainingQueries", 0)
if remaining_queries > 0:
logger.info(f"Token ...{token[-10:]} has {remaining_queries} requests remaining for {model_id}. Using it.", "TokenManager")
if sso in self.token_status_map and normalized_model in self.token_status_map[sso]:
self.token_status_map[sso][normalized_model].update({"isValid": True, "totalRequestCount": self.token_status_map[sso][normalized_model].get("totalRequestCount", 0) + 1})
if not self.token_reset_switch: self.start_token_reset_process(); self.token_reset_switch = True
self.save_token_status()
return token
else:
logger.warning(f"Token ...{token[-10:]} for {model_id} has 0 requests. Moving to expired list.", "TokenManager")
if sso in self.token_status_map and normalized_model in self.token_status_map[sso]:
self.token_status_map[sso][normalized_model].update({"isValid": False, "invalidatedTime": int(time.time() * 1000)})
window_size_seconds = rate_limit_data.get("windowSizeSeconds", 3600)
self.move_to_expired(normalized_model, token, window_size_seconds)
self.save_token_status()
continue
except Exception:
logger.error(f"Failed to query rate limit for token ...{token[-10:]}. Assuming invalid, moving to expired list with default timeout.", "TokenManager")
self.move_to_expired(normalized_model, token, 3600)
continue
logger.warning(f"No available tokens with remaining requests found for model {model_id}.", "TokenManager")
return None
def move_to_expired(self, model_id: str, token: str, window_size_seconds: int):
model_tokens = self.token_model_map.get(model_id, [])
token_index = next((i for i, entry in enumerate(model_tokens) if entry["token"] == token), -1)
if token_index != -1:
model_tokens.pop(token_index)
self.expired_tokens.add((token, model_id, time.time(), window_size_seconds))
if not self.token_reset_switch: self.start_token_reset_process(); self.token_reset_switch = True
def normalize_model_name(self, model):
if model.startswith('grok-') and 'deepsearch' not in model and 'reasoning' not in model:
return '-'.join(model.split('-')[:2])
return model
def get_token_count_for_model(self, model_id):
return len(self.token_model_map.get(self.normalize_model_name(model_id), []))
def start_token_reset_process(self):
def reset_expired_tokens_loop():
while True:
now = time.time()
tokens_to_re_add = set()
for token_info in self.expired_tokens:
token, model, expired_timestamp, window_size_seconds = token_info
if now - expired_timestamp >= window_size_seconds:
if not any(e["token"] == token for e in self.token_model_map.get(model, [])):
if model not in self.token_model_map: self.token_model_map[model] = []
self.token_model_map[model].append({"token": token, "AddedTime": int(now * 1000)})
sso = token.split("sso=")[1].split(";")[0]
if sso in self.token_status_map and model in self.token_status_map[sso]:
self.token_status_map[sso][model].update({"isValid": True, "invalidatedTime": None, "totalRequestCount": 0})
tokens_to_re_add.add(token_info)
if tokens_to_re_add:
self.expired_tokens -= tokens_to_re_add
self.save_token_status()
logger.info(f"Re-activated {len(tokens_to_re_add)} token(s) after rate limit window.", "TokenManager")
time.sleep(60)
import threading
timer_thread = threading.Thread(target=reset_expired_tokens_loop)
timer_thread.daemon = True
timer_thread.start()
def get_all_tokens(self):
return list({e["token"] for model_tokens in self.token_model_map.values() for e in model_tokens})
def get_current_token(self, model_id):
normalized_model = self.normalize_model_name(model_id)
if normalized_model not in self.token_model_map or not self.token_model_map[normalized_model]: return None
return self.token_model_map[normalized_model][0]["token"]
def get_token_status_map(self): return self.token_status_map
class Utils:
@staticmethod
def get_statsig_id():
try:
response = curl_requests.get(CONFIG["API"]["STATS_ID_URL"], impersonate="chrome133a", timeout=10, **Utils.get_proxy_options())
response.raise_for_status()
statsig_id = response.json().get("x_statsig_id")
if not statsig_id: raise ValueError("Invalid statsig ID received.")
logger.debug(f"Successfully obtained x-statsig-id: {statsig_id}", "Utils")
return statsig_id
except Exception as e:
raise ValueError(f"Could not retrieve x-statsig-id: {e}") from e
@staticmethod
def organize_search_results(search_results):
if not search_results or 'results' not in search_results: return ''
results = [f"\r\n<details><summary>资料[{i}]: {r.get('title', '未知标题')}</summary>\r\n{r.get('preview', '无预览内容')}\r\n\n[Link]({r.get('url', '#')})\r\n</details>" for i, r in enumerate(search_results['results'])]
return '\n\n'.join(results)
@staticmethod
def get_proxy_options():
proxy = CONFIG["API"]["PROXY"]
if not proxy: return {}
if proxy.startswith("socks5://"):
if '@' in proxy:
auth_part, host_part = proxy.split('@')
username, password = auth_part.split('://')[1].split(':')
return {"proxy": f"socks5://{host_part}", "proxy_auth": (username, password)}
return {"proxy": proxy}
return {"proxies": {"https": proxy, "http": proxy}}
class GrokApiClient:
def __init__(self, model_id):
if model_id not in CONFIG["MODELS"]:
raise ValueError(f"Unsupported model: {model_id}")
self.model_id = CONFIG["MODELS"][model_id]
def get_image_type(self, base64_string):
mime_type = 'image/jpeg'
if 'data:image' in base64_string:
match = re.search(r'data:([a-zA-Z0-9]+\/[a-zA-Z0-9-.+]+);base64,', base64_string)
if match: mime_type = match.group(1)
extension = mime_type.split('/')[1]
return {"mimeType": mime_type, "fileName": f"image.{extension}"}
def _upload_file(self, upload_data, headers, url_path):
try:
logger.info(f"Sending file upload request to {url_path}", "Server")
proxy_options = Utils.get_proxy_options()
response = curl_requests.post(
f"{CONFIG['API']['BASE_URL']}{url_path}",
headers=headers, json=upload_data,
impersonate="chrome133a", **proxy_options
)
response.raise_for_status()
result = response.json()
logger.info(f"File upload successful: {result}", "Server")
return result.get("fileMetadataId", "")
except Exception as error:
logger.error(f"File upload failed: {error}", "Server")
raise
def upload_base64_file(self, message, headers):
message_base64 = base64.b64encode(message.encode('utf-8')).decode('utf-8')
upload_data = {"fileName": "message.txt", "fileMimeType": "text/plain", "content": message_base64}
return self._upload_file(upload_data, headers, "/rest/app-chat/upload-file")
def upload_base64_image(self, base64_data, headers):
image_buffer = base64_data.split(',')[1] if 'data:image' in base64_data else base64_data
image_info = self.get_image_type(base64_data)
upload_data = {
"rpc": "uploadFile",
"req": {"fileName": image_info["fileName"], "fileMimeType": image_info["mimeType"], "content": image_buffer}
}
return self._upload_file(upload_data, headers, "/api/rpc")
def prepare_chat_request(self, request_data, headers):
model = request_data["model"]
if 'imageGen' in model and not CONFIG["API"]["PICGO_KEY"] and not CONFIG["API"]["TUMY_KEY"] and request_data.get("stream", False):
raise ValueError("Streaming for image generation models requires PICGO or TUMY key!")
todo_messages = request_data["messages"]
if model in ['grok-2-imageGen', 'grok-3-imageGen', 'grok-3-deepsearch']:
if todo_messages[-1]["role"] != 'user':
raise ValueError('The last message must be from the user for this model!')
todo_messages = [todo_messages[-1]]
file_attachments, messages, last_content = [], '', ''
last_role, message_length, convert_to_file = None, 0, False
def remove_think_tags(text):
text = re.sub(r'<think>[\s\S]*?<\/think>', '', text, flags=re.DOTALL).strip()
return re.sub(r'!\[image\]\(data:.*?base64,.*?\)', '[图片]', text)
def process_content(content):
if isinstance(content, list):
return '\n'.join(
'[图片]' if item["type"] == IMAGE_TYPE_URL else remove_think_tags(item.get("text", ""))
for item in content
)
return remove_think_tags(str(content)) if content else ''
for i, current in enumerate(todo_messages):
role = ROLE_ASSISTANT if current["role"] == ROLE_ASSISTANT else ROLE_USER
is_last_message = i == len(todo_messages) - 1
if is_last_message and isinstance(current.get("content"), list):
for item in current["content"]:
if item["type"] == IMAGE_TYPE_URL:
try:
file_id = self.upload_base64_image(item["image_url"]["url"], headers)
if file_id: file_attachments.append(file_id)
except Exception:
logger.warning("Failed to upload an image, skipping it.", "RequestPrep")
text_content = process_content(current.get("content", ""))
if message_length >= 40000 and not convert_to_file:
convert_to_file = True
try:
file_id = self.upload_base64_file(messages, headers)
if file_id: file_attachments.insert(0, file_id)
messages = ""
except Exception:
logger.error("Failed to upload long conversation as file.", "RequestPrep")
if text_content or (is_last_message and file_attachments):
if role == last_role and text_content:
last_content += '\n' + text_content
messages = messages[:messages.rfind(f"{role.upper()}:")] + f"{role.upper()}: {last_content}\n"
else:
messages += f"{role.upper()}: {text_content or '[图片]'}\n"
last_content = text_content
last_role = role
message_length += len(text_content)
if not messages.strip():
if file_attachments:
messages = 'Based on the attached file(s), please respond.'
else:
raise ValueError('Message content is empty!')
deepsearchPreset = ''
if model == 'grok-3-deepsearch': deepsearchPreset = 'default'
elif model == 'grok-3-deepersearch': deepsearchPreset = 'deeper'
search = 'search' in model
return {
"temporary": CONFIG["API"]["IS_TEMP_CONVERSATION"], "modelName": self.model_id,
"message": messages.strip(), "fileAttachments": file_attachments[:4], "imageAttachments": [],
"disableArtifact": False, "isFromGrokFiles": False,
"disableSearch": False, "enableImageGeneration": True, "returnImageBytes": False,
"returnRawGrokInXaiRequest": False, "enableImageStreaming": False, "imageGenerationCount": 1,
"forceConcise": False, "toolOverrides": {"imageGen": 'imageGen' in model, "webSearch": search,
"xSearch": search, "xMediaSearch": search, "trendsSearch": search, "xPostAnalyze": search},
"enableSideBySide": True, "sendFinalMetadata": True, "customPersonality": "",
"deepsearchPreset": deepsearchPreset, "isReasoning": model == 'grok-3-reasoning',
"disableTextFollowUps": True
}
class MessageProcessor:
@staticmethod
def create_chat_response(message, model, is_stream=False):
response_id = f"chatcmpl-{uuid.uuid4()}"
created_time = int(time.time())
base = {"id": response_id, "created": created_time, "model": model}
if is_stream:
return {**base, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": message}}]}
return {**base, "object": "chat.completion", "choices": [{"index": 0, "message": {"role": ROLE_ASSISTANT, "content": message}, "finish_reason": "stop"}], "usage": None}
def process_model_response(response_data: Dict[str, Any], model: str, req_ctx: RequestContext) -> str:
token = response_data.get("token")
if 'search' in model and response_data.get("webSearchResults") and CONFIG["ISSHOW_SEARCH_RESULTS"]:
return f"\r\n<think>{Utils.organize_search_results(response_data['webSearchResults'])}</think>\r\n"
is_deepsearch = model in ['grok-3-deepsearch', 'grok-3-deepersearch']
is_reasoning = model == 'grok-3-reasoning'
if is_deepsearch:
if response_data.get("messageStepId") and not CONFIG["SHOW_THINKING"]: return ""
if response_data.get("messageStepId") and not req_ctx.is_thinking:
req_ctx.is_thinking = True
return f"<think>{token or ''}"
if not response_data.get("messageStepId") and req_ctx.is_thinking and response_data.get("messageTag") == MESSAGE_TAG_FINAL:
req_ctx.is_thinking = False
return f"</think>{token or ''}"
if (response_data.get("messageStepId") and req_ctx.is_thinking and response_data.get("messageTag") == ROLE_ASSISTANT) or response_data.get("messageTag") == MESSAGE_TAG_FINAL:
return token
if req_ctx.is_thinking and isinstance(token, dict) and token.get("action") == "webSearch":
return token.get("action_input", {}).get("query", "")
if req_ctx.is_thinking and response_data.get("webSearchResults"):
return Utils.organize_search_results(response_data['webSearchResults'])
return token if token is not None else ""
if is_reasoning:
if response_data.get("isThinking") and not CONFIG["SHOW_THINKING"]: return ""
if response_data.get("isThinking") and not req_ctx.is_thinking:
req_ctx.is_thinking = True
return f"<think>{token or ''}"
if not response_data.get("isThinking") and req_ctx.is_thinking:
req_ctx.is_thinking = False
return f"</think>{token or ''}"
return token if token is not None else ""
return token if token is not None else ""
def handle_image_responses(image_urls: List[str]) -> str:
processed_images = []
if not image_urls:
return ""
for image_url in image_urls:
for attempt in range(CONFIG["RETRY"]["MAX_ATTEMPTS"]):
try:
cookie = token_manager.get_current_token('grok-2') or ''
if CONFIG["SERVER"].get('CF_CLEARANCE'): cookie += f";{CONFIG['SERVER']['CF_CLEARANCE']}"
response = curl_requests.get(f"https://assets.grok.com/{image_url}", headers={**DEFAULT_HEADERS, "Cookie": cookie}, impersonate="chrome133a", **Utils.get_proxy_options())
response.raise_for_status()
image_buffer = response.content
if CONFIG["API"]["PICGO_KEY"]:
files, headers = {'source': ('image.jpg', image_buffer, 'image/jpeg')}, {"X-API-Key": CONFIG["API"]["PICGO_KEY"]}
upload_resp = requests.post("https://www.picgo.net/api/1/upload", files=files, headers=headers)
if upload_resp.ok: processed_images.append(f"['image']['url']})"); break
if CONFIG["API"]["TUMY_KEY"]:
files, headers = {'file': ('image.jpg', image_buffer, 'image/jpeg')}, {"Accept": "application/json", 'Authorization': f"Bearer {CONFIG['API']['TUMY_KEY']}"}
upload_resp = requests.post("https://tu.my/api/v1/upload", files=files, headers=headers)
if upload_resp.ok: processed_images.append(f"['data']['links']['url']})"); break
base64_image = base64.b64encode(image_buffer).decode('utf-8')
content_type = response.headers.get('content-type', 'image/jpeg')
processed_images.append(f"")
break
except Exception as error:
logger.error(f"Image ({image_url}) processing attempt {attempt+1} failed: {error}", "ImageHandler")
if attempt + 1 == CONFIG["RETRY"]["MAX_ATTEMPTS"]:
processed_images.append(f"Failed to generate or upload image: {image_url}")
time.sleep(CONFIG["API"]["RETRY_TIME"] / 1000 * (attempt + 1))
return "\n".join(processed_images)
def handle_non_stream_response(response, model, req_ctx: RequestContext):
full_response_text = ""
image_markdown = ""
for chunk in response.iter_lines():
if not chunk: continue
try:
line = chunk.decode("utf-8").strip()
if not line: continue
line_json = json.loads(line)
if line_json.get("error"):
logger.error(json.dumps(line_json, indent=2), "Server")
return json.dumps({"error": "RateLimitError"})
response_data = line_json.get("result", {}).get("response")
if not response_data: continue
if response_data.get("imageDimensions") or response_data.get("imageAttachmentInfo"):
req_ctx.is_img_gen = True
text_token = process_model_response(response_data, model, req_ctx)
if text_token:
full_response_text += text_token
model_response = response_data.get("modelResponse")
if (
req_ctx.is_img_gen and
not req_ctx.is_img_gen_done and
model_response and
model_response.get("generatedImageUrls")
):
logger.info(f"Final image data received. Processing {len(model_response['generatedImageUrls'])} images.", "Server")
image_markdown = handle_image_responses(model_response["generatedImageUrls"])
req_ctx.is_img_gen_done = True
except Exception as e:
logger.error(f"Error processing non-stream chunk '{line}': {e}", "Server")
return full_response_text + image_markdown
def handle_stream_response(response, model, req_ctx: RequestContext):
def generate():
for chunk in response.iter_lines():
if not chunk: continue
try:
line = chunk.decode("utf-8").strip()
if not line: continue
line_json = json.loads(line)
if line_json.get("error"):
logger.error(json.dumps(line_json, indent=2), "Server")
yield f"data: {json.dumps({'error': 'RateLimitError'})}\n\n"
return
response_data = line_json.get("result", {}).get("response")
if not response_data: continue
if response_data.get("imageDimensions") or response_data.get("imageAttachmentInfo"):
req_ctx.is_img_gen = True
text_token = process_model_response(response_data, model, req_ctx)
if text_token:
yield f"data: {json.dumps(MessageProcessor.create_chat_response(text_token, model, True))}\n\n"
model_response = response_data.get("modelResponse")
if (
req_ctx.is_img_gen and
not req_ctx.is_img_gen_done and
model_response and
model_response.get("generatedImageUrls")
):
logger.info(f"Final image data received in stream. Processing {len(model_response['generatedImageUrls'])} images.", "Server")
image_markdown = handle_image_responses(model_response["generatedImageUrls"])
if image_markdown:
yield f"data: {json.dumps(MessageProcessor.create_chat_response(image_markdown, model, True))}\n\n"
req_ctx.is_img_gen_done = True
except Exception as e:
logger.error(f"Error processing stream chunk '{line}': {e}", "Server")
yield "data: [DONE]\n\n"
return generate()
def initialization():
sso_array = os.environ.get("SSO", "").split(',')
logger.info("Loading tokens...", "Server")
token_manager.load_token_status()
for sso in sso_array:
if sso:
token_manager.add_token(f"sso-rw={sso};sso={sso}", isinitialization=True)
token_manager.save_token_status()
logger.info(f"Loaded {len(token_manager.get_all_tokens())} unique tokens.", "Server")
if CONFIG["API"]["PROXY"]:
logger.info(f"Proxy is set: {CONFIG['API']['PROXY']}", "Server")
logger.info("Initialization complete.", "Server")
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)
app.secret_key = os.environ.get('FLASK_SECRET_KEY', secrets.token_hex(16))
app.json.sort_keys = False
@app.route('/manager/login', methods=['GET', 'POST'])
def manager_login():
if CONFIG["ADMIN"]["MANAGER_SWITCH"]:
if request.method == 'POST':
password = request.form.get('password')
if password == CONFIG["ADMIN"]["PASSWORD"]:
session['is_logged_in'] = True
return redirect('/manager')
return render_template('login.html', error=True)
return render_template('login.html', error=False)
else:
return redirect('/')
def check_auth():
return session.get('is_logged_in', False)
@app.route('/manager')
def manager():
if not check_auth():
return redirect('/manager/login')
return render_template('manager.html')
@app.route('/manager/api/get')
def get_manager_tokens():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
return jsonify(token_manager.get_token_status_map())
@app.route('/manager/api/add', methods=['POST'])
def add_manager_token():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
try:
sso = request.json.get('sso')
if not sso:
return jsonify({"error": "SSO token is required"}), 400
token_manager.add_token(f"sso-rw={sso};sso={sso}")
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/manager/api/delete', methods=['POST'])
def delete_manager_token():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
try:
sso = request.json.get('sso')
if not sso:
return jsonify({"error": "SSO token is required"}), 400
token_manager.delete_token(f"sso-rw={sso};sso={sso}")
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/manager/api/cf_clearance', methods=['POST'])
def setCf_Manager_clearance():
if not check_auth():
return jsonify({"error": "Unauthorized"}), 401
try:
cf_clearance = request.json.get('cf_clearance')
if not cf_clearance:
return jsonify({"error": "cf_clearance is required"}), 400
CONFIG["SERVER"]['CF_CLEARANCE'] = cf_clearance
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/v1/models', methods=['GET'])
def get_models():
return jsonify({"object": "list", "data": [{"id": model, "object": "model", "created": int(time.time()), "owned_by": "grok"} for model in CONFIG["MODELS"].keys()]})
def _create_request_headers(cookie, statsig_id):
headers = {**DEFAULT_HEADERS, "x-statsig-id": statsig_id}
full_cookie = cookie
if CONFIG["SERVER"].get('CF_CLEARANCE'):
full_cookie = f"{cookie};{CONFIG['SERVER']['CF_CLEARANCE']}"
if full_cookie:
headers["Cookie"] = full_cookie
return headers
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
response_status_code = 500
try:
auth_token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not auth_token:
return jsonify({"error": 'API key is missing'}), 401
if CONFIG["API"]["IS_CUSTOM_SSO"]:
token_manager.set_token(f"sso={auth_token};sso-rw={auth_token}")
elif auth_token != CONFIG["API"]["API_KEY"]:
return jsonify({"error": 'Unauthorized'}), 401
data = request.json
model = data.get("model")
stream = data.get("stream", False)
statsig_id = Utils.get_statsig_id()
req_ctx = RequestContext()
grok_client = GrokApiClient(model)
for attempt in range(CONFIG["RETRY"]["MAX_ATTEMPTS"]):
signature_cookie = token_manager.get_next_token_for_model(model, statsig_id)
if not signature_cookie:
raise ValueError('No available tokens with remaining requests for this model.')
logger.info(f"Attempt {attempt + 1}: Using token ending with ...{signature_cookie[-10:]}", "Server")
request_headers = _create_request_headers(signature_cookie, statsig_id)
try:
request_payload = grok_client.prepare_chat_request(data, request_headers)
logger.debug(f"Request Payload: {json.dumps(request_payload, indent=2)}", "Server")
proxy_options = Utils.get_proxy_options()
response = curl_requests.post(
f"{CONFIG['API']['BASE_URL']}/rest/app-chat/conversations/new",
headers=request_headers, data=json.dumps(request_payload),
impersonate="chrome133a", stream=True, **proxy_options
)
if response.status_code == 200:
response_status_code = 200
logger.info(f"Request successful. Active tokens for {model}: {token_manager.get_token_count_for_model(model)}", "Server")
if stream:
return Response(stream_with_context(handle_stream_response(response, model, req_ctx)), content_type='text/event-stream')
else:
content = handle_non_stream_response(response, model, req_ctx)
if "RateLimitError" in content:
raise ValueError("Rate limit error detected in non-stream response.")
return jsonify(MessageProcessor.create_chat_response(content, model))
if response.status_code in [403, 429]:
error_msg = f"Token for {model} rate limited or IP blocked (status: {response.status_code})."
logger.warning(f"{error_msg} Moving to expired list and trying next token...", "Server")
if token_manager.get_token_count_for_model(model) == 0:
raise ValueError(f"{error_msg} All tokens for this model exhausted.")
continue
else:
logger.error(f"Token error! Status: {response.status_code}, Response: {response.text}", "Server")
continue
except Exception as e:
logger.error(f"Request processing exception: {e}", "Server")
if CONFIG["API"]["IS_CUSTOM_SSO"]: raise
continue
raise ValueError(f'All {CONFIG["RETRY"]["MAX_ATTEMPTS"]} retry attempts failed for model {model}. Please try again later.')
except Exception as error:
logger.error(str(error), "ChatAPI")
return jsonify({"error": {"message": str(error), "type": "server_error"}}), response_status_code
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def catch_all(path):
return 'API is running normally', 200
if __name__ == '__main__':
token_manager = AuthTokenManager()
initialization()
app.run(host='0.0.0.0', port=CONFIG["SERVER"]["PORT"], debug=False)