Commit 09b84c6f authored by 水玉婷's avatar 水玉婷
Browse files

feat:还原重连机制

parent e1ea9e38
...@@ -253,48 +253,6 @@ const handleSSEMessage = (data: SSEData) => { ...@@ -253,48 +253,6 @@ const handleSSEMessage = (data: SSEData) => {
} }
}; };
// 添加网络状态模拟消息函数(带去重机制和样式区分)
const lastNetworkStatusMessage = ref<{type: string, message: string, timestamp: number} | null>(null);
const addNetworkStatusMessage = (type: 'error' | 'success', message: string) => {
const now = Date.now();
// 去重逻辑:相同类型和内容的消息在5秒内不重复显示
if (lastNetworkStatusMessage.value &&
lastNetworkStatusMessage.value.type === type &&
lastNetworkStatusMessage.value.message === message &&
now - lastNetworkStatusMessage.value.timestamp < 5000) {
console.log(`📡 ${type === 'error' ? '' : ''} 跳过重复网络状态消息:`, message);
return;
}
lastNetworkStatusMessage.value = { type, message, timestamp: now };
const statusMessage: Message = {
messageType: 'received', // 改回原来的receive类型
avatar: 'AI',
recordId: '',
promptTokens: 0,
completionTokens: 0,
totalTokens: 0,
date: dayjs().format('HH:mm'),
contentBlocks: [{
content: type === 'error' ?
templateService.generateErrorTemplate(message) :
templateService.generateSuccessTemplate(message),
thinkContent: '',
hasThinkBox: false,
thinkBoxExpanded: false,
}]
};
messages.value.push(statusMessage);
nextTick(() => {
scrollToBottom();
});
console.log(`📡 ${type === 'error' ? '' : ''} 网络状态消息:`, message);
};
// 创建SSE服务实例 // 创建SSE服务实例
const sseService = createSSEService({ const sseService = createSSEService({
apiBaseUrl: props.apiBaseUrl, apiBaseUrl: props.apiBaseUrl,
...@@ -304,59 +262,17 @@ const sseService = createSSEService({ ...@@ -304,59 +262,17 @@ const sseService = createSSEService({
}, { }, {
onMessage: handleSSEMessage, onMessage: handleSSEMessage,
onError: (error) => { onError: (error) => {
console.error('SSE连接错误:', error); console.error('SSE error:', error);
isAIResponding.value = false; isAIResponding.value = false;
isInThinkingMode.value = false; isInThinkingMode.value = false;
closeSSE();
// 关键修改:SSE连接错误时也重置AI响应状态,确保重连后重新开始对话块
currentAIResponse.value = null;
currentBlockIndex.value = -1;
// 优化:只在没有网络断开消息的情况下显示SSE错误消息
if (!lastNetworkStatusMessage.value || lastNetworkStatusMessage.value.type !== 'error') {
addNetworkStatusMessage('error', '服务连接异常,正在尝试重新连接...');
}
console.log('🔄 等待自动重连...');
// 不再手动关闭SSE,让重连逻辑自动处理
// closeSSE();
}, },
onOpen: (event) => { onOpen: (event) => {
console.log('✅ SSE连接已建立', event); console.log('SSE连接已建立', event);
// 只在真正需要时显示连接成功消息
// 避免在正常连接时也显示恢复消息
if (lastNetworkStatusMessage.value?.type === 'error') {
addNetworkStatusMessage('success', '服务连接已恢复,可以正常对话了!');
}
}, },
onReconnect: (newDialogSessionId) => { onReconnect: (newDialogSessionId) => {
console.log('🔄 SSE重连成功,新的dialogSessionId:', newDialogSessionId); console.log('SSE重连成功,新的dialogSessionId:', newDialogSessionId);
dialogSessionId.value = newDialogSessionId; dialogSessionId.value = newDialogSessionId;
// 标记重连成功,避免心跳检测重复触发重连
sseService.markReconnectSuccess();
// 优化:只在有错误消息的情况下显示重连成功消息
if (lastNetworkStatusMessage.value?.type === 'error') {
addNetworkStatusMessage('success', '服务重连成功,对话已恢复!');
}
},
onNetworkOffline: () => {
console.log('📡 网络断开事件触发');
// 网络断开时添加错误消息
addNetworkStatusMessage('error', '网络连接已断开,正在尝试重新连接...');
// 关键修改:网络断开时重置AI响应状态,确保网络恢复后重新开始对话块
console.log('💡 网络断开,重置AI响应状态,准备网络恢复时重新开始对话块');
isAIResponding.value = false;
isInThinkingMode.value = false;
currentAIResponse.value = null;
currentBlockIndex.value = -1;
},
onNetworkOnline: () => {
console.log('🌐 网络恢复事件触发');
// 优化:网络恢复事件只记录日志,不显示消息,避免与重连成功消息重复
// 真正的恢复消息由onReconnect处理
} }
}); });
...@@ -436,6 +352,18 @@ const sendMessage = async (type: MessageType = 'text', params: MessageParams = { ...@@ -436,6 +352,18 @@ const sendMessage = async (type: MessageType = 'text', params: MessageParams = {
// 开始对话 // 开始对话
startConversation(); startConversation();
// 检查SSE连接状态,如果连接断开则触发重连
const connectionState = sseService.getConnectionState();
console.log('SSE连接状态:', connectionState);
// EventSource readyState: 0=CONNECTING, 1=OPEN, 2=CLOSED
if (connectionState === 2) { // 连接已关闭
console.log('💡 检测到SSE连接已断开,触发重连');
if (dialogSessionId.value) {
sseService.reconnectSSE(dialogSessionId.value);
}
}
isAIResponding.value = false; isAIResponding.value = false;
isInThinkingMode.value = false; isInThinkingMode.value = false;
currentAIResponse.value = null; currentAIResponse.value = null;
...@@ -544,7 +472,6 @@ const closeSSE = () => { ...@@ -544,7 +472,6 @@ const closeSSE = () => {
// 初始化SSE连接 // 初始化SSE连接
const initSSE = () => { const initSSE = () => {
console.log('🔗 初始化SSE连接,当前会话ID:', dialogSessionId.value || '');
sseService.initSSE(dialogSessionId.value); sseService.initSSE(dialogSessionId.value);
}; };
...@@ -650,13 +577,6 @@ defineExpose({ ...@@ -650,13 +577,6 @@ defineExpose({
// 生命周期 // 生命周期
onMounted(() => { onMounted(() => {
console.log('组件挂载,初始 dialogSessionId:', props.dialogSessionId); console.log('组件挂载,初始 dialogSessionId:', props.dialogSessionId);
// 添加防重复初始化检查
if (hasStartedConversation.value) {
console.log('⚠️ 检测到重复初始化,跳过SSE初始化');
return;
}
initSSE(); initSSE();
scrollToBottom(); scrollToBottom();
if (props.dialogSessionId) { if (props.dialogSessionId) {
...@@ -709,7 +629,6 @@ const simulateLineChartMessage = () => { ...@@ -709,7 +629,6 @@ const simulateLineChartMessage = () => {
// 调用handleSSEMessage处理模拟消息 // 调用handleSSEMessage处理模拟消息
handleSSEMessage(simulatedMessage); handleSSEMessage(simulatedMessage);
console.log('折线图消息已发送,使用正确的SSE格式:status=3, type=2');
}; };
</script> </script>
<style lang="less" scoped> <style lang="less" scoped>
......
...@@ -25,8 +25,6 @@ export interface SSEHandlers { ...@@ -25,8 +25,6 @@ export interface SSEHandlers {
onError?: (error: any) => void; onError?: (error: any) => void;
onOpen?: (event: any) => void; onOpen?: (event: any) => void;
onReconnect?: (newDialogSessionId: string) => void; onReconnect?: (newDialogSessionId: string) => void;
onNetworkOffline?: () => void; // 网络断开事件
onNetworkOnline?: () => void; // 网络恢复事件
} }
// SSE服务类 - 专注于SSE连接管理 // SSE服务类 - 专注于SSE连接管理
...@@ -36,37 +34,19 @@ export class SSEService { ...@@ -36,37 +34,19 @@ export class SSEService {
private handlers: SSEHandlers; private handlers: SSEHandlers;
private isReconnecting: Ref<boolean> = ref(false); private isReconnecting: Ref<boolean> = ref(false);
private timeArr: NodeJS.Timeout[] = []; private timeArr: NodeJS.Timeout[] = [];
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 5;
private currentDialogSessionId: string = '';
private connectionMonitorInterval: NodeJS.Timeout | null = null;
// 心跳检测配置 // keepalive相关状态
private readonly HEARTBEAT_INTERVAL = 30000; // 30秒检测一次 private lastKeepaliveTime: number = 0; // 记录最近收到keepalive事件的时间
private readonly RECONNECT_DELAY_BASE = 3000; // 3秒基础重连延迟 private hasReceivedKeepalive: boolean = false; // 标记是否收到过keepalive事件
private readonly MAX_RECONNECT_DELAY = 60000; // 最大重连延迟60秒
private readonly MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
constructor(config: SSEServiceConfig, handlers: SSEHandlers = {}) { constructor(config: SSEServiceConfig, handlers: SSEHandlers = {}) {
this.config = config; this.config = config;
this.handlers = handlers; this.handlers = handlers;
// 监听网络状态变化
if (typeof window !== 'undefined') {
window.addEventListener('online', this.handleNetworkOnline.bind(this));
window.addEventListener('offline', this.handleNetworkOffline.bind(this));
}
} }
// 初始化SSE连接 // 初始化SSE连接
public initSSE(dialogSessionId: string): void { public initSSE(dialogSessionId: string): void {
try { try {
this.currentDialogSessionId = dialogSessionId;
this.reconnectAttempts = 0; // 重置重连次数
console.log('🔗 开始建立SSE连接...', '会话ID:', dialogSessionId || '');
// 即使会话ID为空,也尝试建立连接
// 服务器可能会返回新的会话ID,或者允许空会话ID的连接
const url = `${this.config.apiBaseUrl}/aiService/sse/join/${this.config.params?.stage || ''}?app-id=${this.config.params?.appId || ''}&dialog-session-id=${dialogSessionId || ''}`; const url = `${this.config.apiBaseUrl}/aiService/sse/join/${this.config.params?.stage || ''}?app-id=${this.config.params?.appId || ''}&dialog-session-id=${dialogSessionId || ''}`;
this.eventSource = new EventSourcePolyfill(url, { this.eventSource = new EventSourcePolyfill(url, {
headers: { headers: {
...@@ -75,18 +55,12 @@ export class SSEService { ...@@ -75,18 +55,12 @@ export class SSEService {
'x-app-code': this.config.appCode || '', 'x-app-code': this.config.appCode || '',
}, },
withCredentials: true, withCredentials: true,
connectionTimeout: 60000 connectionTimeout: 60000,
heartbeatTimeout: 60000,
}); });
this.eventSource.onopen = (event) => { this.eventSource.onopen = (event) => {
this.reconnectAttempts = 0; // 连接成功时重置重连次数 // 移除这里的日志,只在外部处理器中打印
this.isReconnecting.value = false;
console.log('✅ SSE连接已建立');
// 启动心跳检测
this.startHeartbeatCheck();
if (this.handlers.onOpen) { if (this.handlers.onOpen) {
this.handlers.onOpen(event); this.handlers.onOpen(event);
} }
...@@ -96,9 +70,6 @@ export class SSEService { ...@@ -96,9 +70,6 @@ export class SSEService {
try { try {
const data: SSEData = JSON.parse(event.data); const data: SSEData = JSON.parse(event.data);
// 重置重连次数,因为收到了有效消息
this.reconnectAttempts = 0;
// 只传递原始数据,模板处理在外部进行 // 只传递原始数据,模板处理在外部进行
if (this.handlers.onMessage) { if (this.handlers.onMessage) {
this.handlers.onMessage(data); this.handlers.onMessage(data);
...@@ -108,35 +79,51 @@ export class SSEService { ...@@ -108,35 +79,51 @@ export class SSEService {
} }
}); });
// 监听keepalive事件
this.eventSource.addEventListener('keepalive', (event) => {
this.lastKeepaliveTime = Date.now();
this.hasReceivedKeepalive = true; // 标记已收到keepalive
console.log('💓 收到keepalive事件,连接正常');
});
this.eventSource.onerror = (error) => { this.eventSource.onerror = (error) => {
console.error('❌ SSE连接错误:', error); console.error('SSE error:', error);
// 检查是否为"No activity"错误 - 使用更宽松的检测条件
const errorString = String(error).toLowerCase();
// 检测各种可能的"No activity"错误变体
const isNoActivityError =
errorString.includes('no activity') ||
errorString.includes('no activity within') ||
errorString.includes('milliseconds') && errorString.includes('reconnecting') ||
errorString.includes('120000') && errorString.includes('reconnecting');
// 如果是"No activity"错误且收到过keepalive,说明连接正常,忽略错误
if (isNoActivityError && this.hasReceivedKeepalive) {
this.hasReceivedKeepalive = false; // 重置状态,等待下一次keepalive
return; // 不关闭连接,不触发重连,完全退出错误处理
}
// 如果只是"No activity"错误但没有收到keepalive,也尝试忽略(可能是误报)
if (isNoActivityError) {
return; // 不关闭连接,不触发重连
}
// 如果不是"No activity"错误,则执行正常的错误处理
if (this.handlers.onError) { if (this.handlers.onError) {
this.handlers.onError(error); this.handlers.onError(error);
} }
this.closeSSE(); this.closeSSE();
// 检测服务器不可用情况 // 添加错误重连逻辑
const isServerUnavailable = this.isServerUnavailableError(error); if (!this.isReconnecting.value) {
if (isServerUnavailable) {
console.error('⛔ 服务器不可用,停止重连');
// 重置重连状态,避免后续重连
this.reconnectAttempts = this.MAX_RECONNECT_ATTEMPTS;
return;
}
// 简单重连逻辑
if (!this.isReconnecting.value && this.reconnectAttempts < this.MAX_RECONNECT_ATTEMPTS) {
this.reconnectAttempts++;
const delay = this.calculateReconnectDelay();
console.log(`🔄 SSE连接断开,将在 ${delay}ms 后尝试第 ${this.reconnectAttempts} 次重连`);
setTimeout(() => { setTimeout(() => {
if (this.currentDialogSessionId) { if (dialogSessionId) {
this.reconnectSSE(this.currentDialogSessionId); this.reconnectSSE(dialogSessionId);
} }
}, delay); }, 3000);
} }
}; };
...@@ -148,45 +135,25 @@ export class SSEService { ...@@ -148,45 +135,25 @@ export class SSEService {
// 重新连接SSE // 重新连接SSE
public reconnectSSE(newDialogSessionId: string): void { public reconnectSSE(newDialogSessionId: string): void {
if (this.isReconnecting.value) { if (this.isReconnecting.value) {
console.log('⏳ 正在重连中,跳过重复重连');
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('⛔ 重连次数已达上限,停止重连');
return; return;
} }
this.isReconnecting.value = true; this.isReconnecting.value = true;
console.log(`🔄 开始重连SSE(第${this.reconnectAttempts}次)`, '会话ID:', newDialogSessionId || ''); console.log('开始重连SSE,新的dialogSessionId:', newDialogSessionId);
// 只关闭SSE连接,但不停止心跳检测 this.closeSSE();
if (this.eventSource) {
try {
this.eventSource.close();
this.eventSource = null;
} catch (err) {
console.warn('关闭SSE连接时出错:', err);
}
}
const reconnectTimeout = setTimeout(() => { const reconnectTimeout = setTimeout(() => {
this.initSSE(newDialogSessionId); this.initSSE(newDialogSessionId);
// 重连状态会在连接成功后自动重置 setTimeout(() => {
}, 100); this.isReconnecting.value = false;
}, 2000);
}, 500);
this.timeArr.push(reconnectTimeout); this.timeArr.push(reconnectTimeout);
// 关键修复:移除这里的onReconnect回调,真正的重连成功应该在onopen事件中触发 if (this.handlers.onReconnect) {
// 这样只有在连接真正建立成功时才会显示重连成功消息 this.handlers.onReconnect(newDialogSessionId);
console.log('⏳ 重连已启动,等待连接建立...'); }
}
// 标记重连成功(供外部调用)
public markReconnectSuccess(): void {
this.isReconnecting.value = false;
this.reconnectAttempts = 0;
console.log('✅ 重连成功,重置重连状态');
} }
// 关闭SSE连接 // 关闭SSE连接
...@@ -200,17 +167,11 @@ export class SSEService { ...@@ -200,17 +167,11 @@ export class SSEService {
} }
} }
// 停止心跳检测
this.stopHeartbeatCheck();
// 清理所有定时器 // 清理所有定时器
this.timeArr.forEach(timeout => { this.timeArr.forEach(timeout => {
clearTimeout(timeout); clearTimeout(timeout);
}); });
this.timeArr = []; this.timeArr = [];
// 重置重连状态
this.isReconnecting.value = false;
} }
// 获取重连状态 // 获取重连状态
...@@ -226,187 +187,6 @@ export class SSEService { ...@@ -226,187 +187,6 @@ export class SSEService {
return this.eventSource.readyState; return this.eventSource.readyState;
} }
// 计算重连延迟(指数退避策略)
private calculateReconnectDelay(): number {
// 指数退避:3s, 6s, 12s, 24s, 48s
return Math.min(this.RECONNECT_DELAY_BASE * Math.pow(2, this.reconnectAttempts - 1), this.MAX_RECONNECT_DELAY);
}
// 判断是否为服务器不可用错误
private isServerUnavailableError(error: any): boolean {
if (!error) return false;
// 检查错误消息
const errorMessage = String(error.message || '').toLowerCase();
const errorString = String(error).toLowerCase();
// 检查错误目标的状态
const errorTarget = error.target || error;
const readyState = errorTarget.readyState;
// 服务器不可用的明确特征:
// 1. 连接被拒绝 (net::ERR_CONNECTION_REFUSED)
// 2. 服务器返回5xx错误
// 3. 明确的连接失败消息
const isConnectionRefused = errorMessage.includes('connection_refused') ||
errorMessage.includes('connection refused') ||
errorMessage.includes('failed to fetch') ||
errorString.includes('connection_refused') ||
errorString.includes('connection refused');
const isServerError = errorMessage.includes('500') ||
errorMessage.includes('502') ||
errorMessage.includes('503') ||
errorMessage.includes('504') ||
errorString.includes('500') ||
errorString.includes('502') ||
errorString.includes('503') ||
errorString.includes('504');
// 只有当明确是服务器不可用时才停止重连
// 避免将正常的网络波动误判为服务器不可用
const isUnavailable = isConnectionRefused || isServerError;
if (isUnavailable) {
console.log('🔍 检测到服务器不可用错误:', {
message: errorMessage,
string: errorString,
readyState: readyState,
isConnectionRefused,
isServerError
});
}
return isUnavailable;
}
// 处理网络恢复
private handleNetworkOnline(): void {
console.log('🌐 网络已恢复,检查SSE连接状态');
console.log('📊 当前状态 - 重连中:', this.isReconnecting.value, '会话ID:', this.currentDialogSessionId, '重连次数:', this.reconnectAttempts);
// 检查当前连接状态
const isConnected = this.eventSource && this.eventSource.readyState === 1;
if (isConnected) {
console.log('✅ 网络恢复但SSE连接正常,无需重连');
// 即使连接正常,也触发网络恢复回调,但只在真正需要时显示消息
if (this.handlers.onNetworkOnline) {
this.handlers.onNetworkOnline();
}
return;
}
// 如果不在重连中且连接异常,尝试重新连接
if (!this.isReconnecting.value) {
// 无论之前是否有会话ID,都重新创建连接
console.log('🔄 网络恢复,创建新SSE会话');
this.reconnectSSE('');
} else {
console.log('⚠️ 网络恢复但未触发重连 - 重连中:', this.isReconnecting.value, '会话ID存在:', !!this.currentDialogSessionId);
console.log('💡 等待重连完成或心跳检测处理连接状态');
}
// 触发网络恢复回调
if (this.handlers.onNetworkOnline) {
this.handlers.onNetworkOnline();
}
}
// 处理网络断开
private handleNetworkOffline(): void {
console.log('🌐 网络已断开,关闭SSE连接');
// 保存当前的会话ID,保持会话连续性
console.log('💾 当前会话ID:', this.currentDialogSessionId || '');
// 关闭SSE连接
if (this.eventSource) {
try {
this.eventSource.close();
this.eventSource = null;
} catch (err) {
console.warn('关闭SSE连接时出错:', err);
}
}
// 停止心跳检测
this.stopHeartbeatCheck();
// 重置重连状态和重连次数
this.isReconnecting.value = false;
this.reconnectAttempts = 0;
// 关键修改:保持会话ID不变,但标记网络断开状态
// 这样网络恢复时可以继续使用同一个会话,但需要重新开始对话块
console.log('💡 网络断开,保持会话ID,准备网络恢复时重新开始对话块');
// 网络断开时启动心跳检测,以便网络恢复时能立即发现
this.startHeartbeatCheck();
// 触发网络断开回调
if (this.handlers.onNetworkOffline) {
this.handlers.onNetworkOffline();
}
}
// 启动心跳检测
private startHeartbeatCheck(): void {
// 停止之前的心跳检测
this.stopHeartbeatCheck();
console.log(`💓 心跳检测已启动(${this.HEARTBEAT_INTERVAL/1000}秒间隔)`);
// 使用固定间隔开始检测
this.connectionMonitorInterval = setInterval(() => {
this.performHeartbeatCheck();
}, this.HEARTBEAT_INTERVAL);
}
// 执行心跳检测
private performHeartbeatCheck(): void {
// 检查网络状态
const isOnline = typeof window !== 'undefined' && window.navigator.onLine;
if (!isOnline) {
console.log('💓 SSE心跳检测 - 网络已断开,等待网络恢复');
return;
}
if (this.eventSource && this.eventSource.readyState === 1) {
// 连接正常,记录心跳
console.log('💓 SSE心跳检测正常 - 连接状态:', this.eventSource.readyState);
} else {
console.log('💓 SSE心跳检测 - 连接状态:', this.eventSource ? this.eventSource.readyState : '未连接');
// 如果未连接或连接异常,但网络正常,尝试重连
if (!this.isReconnecting.value) {
if (this.currentDialogSessionId) {
console.log('💔 检测到SSE连接异常,尝试重连');
this.reconnectSSE(this.currentDialogSessionId);
} else {
console.log('💡 检测到网络已恢复但无会话ID,等待用户交互或网络恢复处理');
// 不再自动触发重连,避免重复消息
// 网络恢复事件会由网络状态监听器自动处理
}
} else {
// 正在重连中,避免重复触发
console.log('⏳ 心跳检测 - 正在重连中,跳过本次检测');
}
}
}
// 停止心跳检测
private stopHeartbeatCheck(): void {
if (this.connectionMonitorInterval) {
clearInterval(this.connectionMonitorInterval);
this.connectionMonitorInterval = null;
console.log('💓 基础心跳检测已停止');
}
}
// 清理资源 // 清理资源
public destroy(): void { public destroy(): void {
this.closeSSE(); this.closeSSE();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment