diff --git a/backend/app/services/training_monitor.py b/backend/app/services/training_monitor.py index 5f741eb..33b028d 100644 --- a/backend/app/services/training_monitor.py +++ b/backend/app/services/training_monitor.py @@ -21,11 +21,13 @@ class TrainingMonitor: self.sftp_client = None self._running = False self._monitor_task = None - self.recent_logs: List[str] = [] # Store recent log lines - self.max_log_lines: int = 100 # Keep last 100 lines + self.recent_logs: List[str] = [] + self.max_log_lines: int = 500 self.current_status: Optional[TrainingStatus] = None self.remote_path = settings.TRAINING_LOG_REMOTE_PATH if hasattr(settings, 'TRAINING_LOG_REMOTE_PATH') else None self.local_path = settings.TRAINING_LOG_LOCAL_PATH if hasattr(settings, 'TRAINING_LOG_LOCAL_PATH') else None + self._file_handle = None + self._last_position = 0 def _parse_tqdm_line(self, line: str) -> Optional[TrainingStatus]: """Parse tqdm output line into TrainingStatus""" @@ -87,30 +89,91 @@ class TrainingMonitor: logger.error(f"Error reading local log: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) + async def _open_log_file(self): + """Open and maintain file handle""" + if self.remote_path: + if not self.sftp_client: + await self._connect_sftp() + self._file_handle = self.sftp_client.open(self.remote_path, 'rb') + else: + self._file_handle = await aiofiles.open(self.local_path, 'rb') + + async def _read_new_content(self) -> str: + """Read only new content since last read""" + if not self._file_handle: + await self._open_log_file() + + try: + # Get file size + if self.remote_path: + self._file_handle.seek(0, 2) # Seek to end + file_size = self._file_handle.tell() + else: + file_size = os.path.getsize(self.local_path) + + if file_size < self._last_position: + # File has been truncated/rotated + logger.info("Log file has been truncated, reading from start") + self._last_position = 0 + + # Seek to last position + self._file_handle.seek(self._last_position) + + # Read new content + new_content = self._file_handle.read() + if isinstance(new_content, bytes): + new_content = new_content.decode('utf-8') + + # Update position + self._last_position = file_size + + return new_content + + except Exception as e: + logger.error(f"Error reading log: {str(e)}") + # Try to reopen the file on error + await self._reopen_log_file() + return "" + + async def _reopen_log_file(self): + """Reopen file handle in case of errors""" + try: + if self._file_handle: + self._file_handle.close() + except Exception: + pass + self._file_handle = None + await self._open_log_file() + + async def _monitor_log(self): """Monitor log file for updates""" while self._running: try: - content = (await self._read_remote_log() if self.remote_path - else await self._read_local_log()) + new_content = await self._read_new_content() + if new_content: + # Process new lines + new_lines = new_content.splitlines() + if new_lines: + # Update recent logs + self.recent_logs.extend(new_lines) + self.recent_logs = self.recent_logs[-self.max_log_lines:] - # Get last line containing progress info - lines = content.splitlines() - self.recent_logs = lines[-self.max_log_lines:] if lines else [] - for line in reversed(lines): - if '|' in line: # Basic check for tqdm output - status = self._parse_tqdm_line(line) - if status: - self.current_status = status - break + # Update status from last progress line + for line in reversed(new_lines): + if '|' in line: + status = self._parse_tqdm_line(line) + if status: + self.current_status = status + break - await asyncio.sleep(1) # Check every second + await asyncio.sleep(1) except asyncio.CancelledError: break except Exception as e: logger.error(f"Monitor error: {str(e)}") - await asyncio.sleep(5) # Wait before retry + await asyncio.sleep(5) async def get_log(self, lines: int = 50) -> List[str]: """Get recent log entries""" @@ -134,6 +197,15 @@ class TrainingMonitor: await self._monitor_task except asyncio.CancelledError: pass + + # Close file handle + if self._file_handle: + try: + self._file_handle.close() + except Exception: + pass + self._file_handle = None + logger.info("Training monitor stopped") async def get_status(self) -> Optional[TrainingStatus]: