From e0f10d88424792867b1fe5d10becb6762d50a591 Mon Sep 17 00:00:00 2001 From: nic Date: Tue, 15 Apr 2025 08:13:51 -0400 Subject: [PATCH] fix: streaming file data instead of loading full file into ram --- internal/api/attachments.go | 142 ++++++++++++++++ internal/handlers/web/documents.go | 262 ++++++++++++++++++++--------- 2 files changed, 329 insertions(+), 75 deletions(-) diff --git a/internal/api/attachments.go b/internal/api/attachments.go index 1ed9b4f..237a762 100644 --- a/internal/api/attachments.go +++ b/internal/api/attachments.go @@ -338,3 +338,145 @@ func (s *Session) GetJobAttachments(jobID string) ([]map[string]interface{}, err return result.Attachments, nil } + +// UploadAttachmentFile uploads a file as an attachment to a job using streaming (for large files) +// Instead of loading the entire file into memory, this method streams the file directly from the reader +func (s *Session) UploadAttachmentFile(jobID, filename, purpose string, fileReader io.Reader) (map[string]interface{}, error) { + url := fmt.Sprintf("%s/attachment", BaseURL) + + // Create a pipe for streaming the multipart data + pr, pw := io.Pipe() + + // Create a multipart writer + writer := multipart.NewWriter(pw) + + // Start a goroutine to write the multipart form + go func() { + defer pw.Close() // Make sure to close the writer to signal end of the form + + var formError error + + // Log received values + log.Printf("Streaming attachment upload to job ID: %s", jobID) + log.Printf("Filename: %s", filename) + log.Printf("Purpose value: '%s'", purpose) + + // The ServiceTrade API expects the purpose ID as an integer + purposeStr := strings.TrimSpace(purpose) + + // Try to parse the purpose as an integer, removing any leading zeros first + purposeStr = strings.TrimLeft(purposeStr, "0") + if purposeStr == "" { + purposeStr = "0" // If only zeros were provided + } + + purposeInt, err := strconv.Atoi(purposeStr) + if err != nil { + formError = fmt.Errorf("invalid purpose value '%s': must be a valid integer: %v", purpose, err) + pw.CloseWithError(formError) + return + } + + log.Printf("Using purpose value: %d for job: %s", purposeInt, jobID) + + // Add the purposeId (attachment type) as an integer + if err := writer.WriteField("purposeId", fmt.Sprintf("%d", purposeInt)); err != nil { + formError = fmt.Errorf("error writing purposeId field: %v", err) + pw.CloseWithError(formError) + return + } + + // Add the entityType (3 for Job) and entityId (jobID) + if err := writer.WriteField("entityType", "3"); err != nil { // 3 = Job + formError = fmt.Errorf("error writing entityType field: %v", err) + pw.CloseWithError(formError) + return + } + + if err := writer.WriteField("entityId", jobID); err != nil { + formError = fmt.Errorf("error writing entityId field: %v", err) + pw.CloseWithError(formError) + return + } + + // Add a description field with the filename for better identification + if err := writer.WriteField("description", filename); err != nil { + formError = fmt.Errorf("error writing description field: %v", err) + pw.CloseWithError(formError) + return + } + + // Check that filename has an extension + if !strings.Contains(filename, ".") { + formError = fmt.Errorf("filename must include an extension (e.g. .pdf, .docx) for API content type detection") + pw.CloseWithError(formError) + return + } + + // Create the form file field + part, err := writer.CreateFormFile("uploadedFile", filename) + if err != nil { + formError = fmt.Errorf("error creating form file: %v", err) + pw.CloseWithError(formError) + return + } + + // Stream the file content directly from the reader to the form + bytesWritten, err := io.Copy(part, fileReader) + if err != nil { + formError = fmt.Errorf("error copying file content: %v", err) + pw.CloseWithError(formError) + return + } + + log.Printf("Streamed %d bytes to multipart form", bytesWritten) + + // Close the writer to finish the multipart message + if err := writer.Close(); err != nil { + formError = fmt.Errorf("error closing multipart writer: %v", err) + pw.CloseWithError(formError) + return + } + + log.Printf("Multipart form completed successfully") + }() + + // Create the request with the pipe reader as the body + req, err := http.NewRequest("POST", url, pr) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + + // Set headers + req.Header.Set("Content-Type", writer.FormDataContentType()) + req.Header.Set("Cookie", s.Cookie) + // Don't set Content-Length since we're streaming and don't know the size in advance + + // Send the request and wait for the response + resp, err := s.Client.Do(req) + if err != nil { + return nil, fmt.Errorf("error sending request: %v", err) + } + defer resp.Body.Close() + + // Read the response + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + // Check for errors + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + log.Printf("API error response: %s - %s", resp.Status, string(body)) + return nil, fmt.Errorf("API returned error: %s - %s", resp.Status, string(body)) + } + + // Parse the response + var result map[string]interface{} + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("error parsing response: %v", err) + } + + log.Printf("Successfully uploaded streaming attachment %s to job %s", filename, jobID) + return result, nil +} diff --git a/internal/handlers/web/documents.go b/internal/handlers/web/documents.go index c35d0da..b42748a 100644 --- a/internal/handlers/web/documents.go +++ b/internal/handlers/web/documents.go @@ -195,43 +195,53 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { return } - // Custom multipart form processing for 32-bit systems + log.Printf("Starting document upload handler with Content-Length: %.2f MB", + float64(r.ContentLength)/(1024*1024)) + + // Custom multipart form processing that properly streams files reader, err := r.MultipartReader() if err != nil { - http.Error(w, "Unable to get multipart reader: "+err.Error(), http.StatusBadRequest) + log.Printf("Error getting MultipartReader: %v", err) + http.Error(w, "Unable to process multipart form: "+err.Error(), http.StatusBadRequest) return } - // Store form values and file parts + // Store form values formValues := make(map[string]string) - // Read all file contents - type DocumentWithContent struct { - Name string - Type string - FileContent []byte - FormField string // Store the original form field name + // Store file metadata for later processing + type FileMetadata struct { + FormField string + FileName string + FileSize int64 + BoundaryID string + Type string // Will be set later from form values + CustomName string // Will be set later from form values } - var docsWithContent []DocumentWithContent + var fileMetadata []FileMetadata - // First pass: collect all form fields and files - log.Printf("--- Starting multipart form processing ---") + // First pass: read form fields only (not file contents) + log.Printf("First pass - collecting form fields and file metadata") + partIndex := 0 for { part, err := reader.NextPart() if err == io.EOF { break } if err != nil { - log.Printf("Error reading multipart part: %v", err) - break + log.Printf("Error reading part %d: %v", partIndex, err) + http.Error(w, "Error reading multipart form: "+err.Error(), http.StatusBadRequest) + return } + partIndex++ + // Get the form field name and file name (if any) formName := part.FormName() fileName := part.FileName() - // If not a file, it's a regular form value + // If not a file, read the value if fileName == "" { - // Read the form field value + // It's a regular form field, not a file valueBytes, err := io.ReadAll(part) if err != nil { log.Printf("Error reading form field %s: %v", formName, err) @@ -241,23 +251,15 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { formValues[formName] = value log.Printf("Form field: %s = %s", formName, value) } else if strings.HasPrefix(formName, "document-file-") { - // It's a file upload field - // Read file content - fileContent, err := io.ReadAll(part) - if err != nil { - log.Printf("Error reading file content for %s: %v", fileName, err) - continue - } - - log.Printf("Found file: %s (size: %d bytes) in field: %s", - fileName, len(fileContent), formName) - - // Store the file with its original field name for later processing - docsWithContent = append(docsWithContent, DocumentWithContent{ - Name: fileName, // Default to original filename, will be updated with form values - Type: "", // Will be set from form values - FileContent: fileContent, - FormField: formName, + // It's a file field, but don't read the content yet + // Just store metadata for later processing + log.Printf("Found file field: %s, filename: %s", formName, fileName) + + // Use the http.DetectContentType function later when we stream the file + fileMetadata = append(fileMetadata, FileMetadata{ + FormField: formName, + FileName: fileName, + BoundaryID: part.FormName(), }) } } @@ -274,17 +276,16 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { } log.Printf("Job numbers: %s", jobNumbers) - - // Split the job numbers jobs := strings.Split(jobNumbers, ",") if len(jobs) == 0 { http.Error(w, "No valid job numbers provided", http.StatusBadRequest) return } - // Second pass: process document metadata - for i, doc := range docsWithContent { - suffix := strings.TrimPrefix(doc.FormField, "document-file-") + // Second pass: enrich file metadata with document type and custom name + log.Printf("Second pass - enriching file metadata with document types and custom names") + for i, metadata := range fileMetadata { + suffix := strings.TrimPrefix(metadata.FormField, "document-file-") nameField := "document-name-" + suffix typeField := "document-type-" + suffix @@ -293,38 +294,37 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { if customName != "" { // If a custom name is provided without extension, add the original file extension if !strings.Contains(customName, ".") { - extension := filepath.Ext(doc.Name) + extension := filepath.Ext(metadata.FileName) if extension != "" { customName = customName + extension } } - docsWithContent[i].Name = customName + fileMetadata[i].CustomName = customName } // Get document type docType := formValues[typeField] if docType == "" { - log.Printf("No document type for file %s, skipping", doc.Name) + log.Printf("No document type for file %s, skipping", metadata.FileName) continue } - docsWithContent[i].Type = docType - log.Printf("Processing document: %s (type: %s) from field: %s", - docsWithContent[i].Name, docType, doc.FormField) + fileMetadata[i].Type = docType + log.Printf("Enriched metadata: %s (type: %s, custom name: %s)", + metadata.FileName, docType, fileMetadata[i].CustomName) } - // Filter out documents with no type - var validDocs []DocumentWithContent - for _, doc := range docsWithContent { - if doc.Type != "" { - validDocs = append(validDocs, doc) + // Filter out files with no type + var validFiles []FileMetadata + for _, metadata := range fileMetadata { + if metadata.Type != "" { + validFiles = append(validFiles, metadata) } } - docsWithContent = validDocs - - log.Printf("Total valid documents to upload: %d", len(docsWithContent)) + fileMetadata = validFiles - if len(docsWithContent) == 0 { + log.Printf("Total valid files to upload: %d", len(fileMetadata)) + if len(fileMetadata) == 0 { http.Error(w, "No valid documents selected for upload", http.StatusBadRequest) return } @@ -336,14 +336,15 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { // Channel for collecting results type UploadResult struct { - JobID string - DocName string - Success bool - Error string - Data map[string]interface{} + JobID string + DocName string + Success bool + Error string + Data map[string]interface{} + FileSize int64 } - totalUploads := len(jobs) * len(docsWithContent) + totalUploads := len(jobs) * len(fileMetadata) resultsChan := make(chan UploadResult, totalUploads) // Create a wait group to track when all uploads are done @@ -352,13 +353,16 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { // Create a semaphore channel to limit concurrent uploads semaphore := make(chan struct{}, maxConcurrent) - // Start the upload workers + // Third pass: Start the upload workers with proper streaming + log.Printf("Third pass - starting %d upload workers for %d total uploads", + maxConcurrent, totalUploads) + for _, jobID := range jobs { - for _, doc := range docsWithContent { + for _, metadata := range fileMetadata { wg.Add(1) // Launch a goroutine for each job+document combination - go func(jobID string, doc DocumentWithContent) { + go func(jobID string, metadata FileMetadata) { defer wg.Done() // Acquire a semaphore slot @@ -368,27 +372,70 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { // Add a small delay to avoid overwhelming the API time.Sleep(requestDelay) - // Call the ServiceTrade API - result, err := session.UploadAttachment(jobID, doc.Name, doc.Type, doc.FileContent) + // Get the file name to use (custom name or original) + fileName := metadata.FileName + if metadata.CustomName != "" { + fileName = metadata.CustomName + } + + // Create a new multipart reader for each upload to get the file part + log.Printf("Worker starting upload of %s to job %s", fileName, jobID) + fileReader, _, err := getFileReaderForUpload(r, metadata.FormField) if err != nil { - log.Printf("Error uploading %s to job %s: %v", doc.Name, jobID, err) + log.Printf("Error creating file reader for %s: %v", fileName, err) resultsChan <- UploadResult{ - JobID: jobID, - DocName: doc.Name, - Success: false, - Error: err.Error(), + JobID: jobID, + DocName: fileName, + Success: false, + Error: fmt.Sprintf("Error preparing file: %v", err), + FileSize: 0, + } + return + } + + // Create a size tracking reader + sizeTracker, ok := fileReader.(*readCloserWithSize) + if !ok { + log.Printf("Warning: fileReader is not a readCloserWithSize, size will not be tracked") + } + + // Log streaming progress + log.Printf("Starting to stream file %s to job %s", fileName, jobID) + + // Call ServiceTrade API with the file reader + uploadStart := time.Now() + result, err := session.UploadAttachmentFile(jobID, fileName, metadata.Type, fileReader) + uploadDuration := time.Since(uploadStart) + + // Get the actual size that was uploaded + var fileSize int64 + if sizeTracker != nil { + fileSize = sizeTracker.Size() + } + + if err != nil { + log.Printf("Error uploading %s to job %s after %v: %v", + fileName, jobID, uploadDuration, err) + resultsChan <- UploadResult{ + JobID: jobID, + DocName: fileName, + Success: false, + Error: err.Error(), + FileSize: fileSize, } } else { - log.Printf("Successfully uploaded %s to job %s", doc.Name, jobID) + log.Printf("Successfully uploaded %s (%.2f MB) to job %s in %v", + fileName, float64(fileSize)/(1024*1024), jobID, uploadDuration) resultsChan <- UploadResult{ - JobID: jobID, - DocName: doc.Name, - Success: true, - Data: result, + JobID: jobID, + DocName: fileName, + Success: true, + Data: result, + FileSize: fileSize, } } - }(jobID, doc) + }(jobID, metadata) } } @@ -400,13 +447,28 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { // Collect results results := make(map[string][]UploadResult) + resultsCount := 0 + var totalBytesUploaded int64 + for result := range resultsChan { + resultsCount++ + log.Printf("Received result %d/%d: Job %s, File %s, Success: %v, Size: %.2f MB", + resultsCount, totalUploads, result.JobID, result.DocName, result.Success, + float64(result.FileSize)/(1024*1024)) + + if result.Success { + totalBytesUploaded += result.FileSize + } + if _, exists := results[result.JobID]; !exists { results[result.JobID] = []UploadResult{} } results[result.JobID] = append(results[result.JobID], result) } + log.Printf("All results collected. Total: %d, Total bytes uploaded: %.2f MB", + resultsCount, float64(totalBytesUploaded)/(1024*1024)) + // Generate HTML for results var resultHTML bytes.Buffer @@ -529,6 +591,56 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) { w.Write(resultHTML.Bytes()) } +// getFileReaderForUpload re-parses the multipart request to get a reader just for a specific file +// This allows streaming the file directly from the HTTP request to the API call +func getFileReaderForUpload(r *http.Request, formField string) (io.Reader, int64, error) { + mr, err := r.MultipartReader() + if err != nil { + return nil, 0, fmt.Errorf("error getting multipart reader: %w", err) + } + + // Iterate through parts to find our file + for { + part, err := mr.NextPart() + if err == io.EOF { + break + } + if err != nil { + return nil, 0, fmt.Errorf("error reading multipart part: %w", err) + } + + // Check if this is the file we're looking for + if part.FormName() == formField && part.FileName() != "" { + // We found our file, create a reader that tracks size + sizeTracker := &readCloserWithSize{reader: part} + return sizeTracker, 0, nil + } + } + + return nil, 0, fmt.Errorf("file part %s not found in multipart form", formField) +} + +// readCloserWithSize is a custom io.Reader that counts the bytes read +type readCloserWithSize struct { + reader io.ReadCloser + size int64 +} + +func (r *readCloserWithSize) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.size += int64(n) + return n, err +} + +func (r *readCloserWithSize) Close() error { + return r.reader.Close() +} + +// Size returns the current size of data read +func (r *readCloserWithSize) Size() int64 { + return r.size +} + // DocumentFieldAddHandler generates a new document field for the form func DocumentFieldAddHandler(w http.ResponseWriter, r *http.Request) { // Generate a random ID for the new field