Browse Source

fix: created individual file handlers instead of shared to avoid race conditions, added server request times

document-upload-removal-layout-update
nic 10 months ago
parent
commit
51e985aab1
  1. 13
      apps/web/main.go
  2. 290
      internal/handlers/web/documents.go

13
apps/web/main.go

@ -3,6 +3,7 @@ package main
import ( import (
"log" "log"
"net/http" "net/http"
"time"
root "marmic/servicetrade-toolbox" root "marmic/servicetrade-toolbox"
"marmic/servicetrade-toolbox/internal/handlers/web" "marmic/servicetrade-toolbox/internal/handlers/web"
@ -74,5 +75,15 @@ func main() {
protected.HandleFunc("/documents/remove/bulk", web.BulkRemoveDocumentsHandler).Methods("POST") protected.HandleFunc("/documents/remove/bulk", web.BulkRemoveDocumentsHandler).Methods("POST")
log.Println("Server starting on :8080") log.Println("Server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", r))
// Create a custom server with appropriate timeouts
server := &http.Server{
Addr: ":8080",
Handler: r,
ReadTimeout: 30 * time.Minute, // Large timeout for big file uploads
WriteTimeout: 30 * time.Minute, // Large timeout for big file responses
IdleTimeout: 120 * time.Second, // How long to wait for the next request
}
log.Fatal(server.ListenAndServe())
} }

290
internal/handlers/web/documents.go

@ -6,7 +6,9 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"math"
"net/http" "net/http"
"os"
"path/filepath" "path/filepath"
"sort" "sort"
"strings" "strings"
@ -198,78 +200,21 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("Starting document upload handler with Content-Length: %.2f MB", log.Printf("Starting document upload handler with Content-Length: %.2f MB",
float64(r.ContentLength)/(1024*1024)) float64(r.ContentLength)/(1024*1024))
// Custom multipart form processing that properly streams files // Parse the multipart form with a reasonable buffer size
reader, err := r.MultipartReader() // Files larger than this will be saved to temporary files automatically
if err != nil { maxMemory := int64(32 << 20) // 32MB in memory, rest to disk
log.Printf("Error getting MultipartReader: %v", err) if err := r.ParseMultipartForm(maxMemory); err != nil {
http.Error(w, "Unable to process multipart form: "+err.Error(), http.StatusBadRequest) log.Printf("Error parsing multipart form: %v", err)
http.Error(w, "Unable to parse form: "+err.Error(), http.StatusBadRequest)
return return
} }
// Store form values
formValues := make(map[string]string)
// Store file metadata for later processing
type FileMetadata struct {
FormField string
FileName string
FileSize int64
BoundaryID string
Type string // Will be set later from form values
CustomName string // Will be set later from form values
}
var fileMetadata []FileMetadata
// First pass: read form fields only (not file contents)
log.Printf("First pass - collecting form fields and file metadata")
partIndex := 0
for {
part, err := reader.NextPart()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error reading part %d: %v", partIndex, err)
http.Error(w, "Error reading multipart form: "+err.Error(), http.StatusBadRequest)
return
}
partIndex++
// Get the form field name and file name (if any)
formName := part.FormName()
fileName := part.FileName()
// If not a file, read the value
if fileName == "" {
// It's a regular form field, not a file
valueBytes, err := io.ReadAll(part)
if err != nil {
log.Printf("Error reading form field %s: %v", formName, err)
continue
}
value := string(valueBytes)
formValues[formName] = value
log.Printf("Form field: %s = %s", formName, value)
} else if strings.HasPrefix(formName, "document-file-") {
// It's a file field, but don't read the content yet
// Just store metadata for later processing
log.Printf("Found file field: %s, filename: %s", formName, fileName)
// Use the http.DetectContentType function later when we stream the file
fileMetadata = append(fileMetadata, FileMetadata{
FormField: formName,
FileName: fileName,
BoundaryID: part.FormName(),
})
}
}
// Get job numbers from form values // Get job numbers from form values
jobNumbers := formValues["jobNumbers"] jobNumbers := r.FormValue("jobNumbers")
if jobNumbers == "" { if jobNumbers == "" {
jobNumbers = formValues["job-ids"] jobNumbers = r.FormValue("job-ids")
if jobNumbers == "" { if jobNumbers == "" {
log.Printf("No job numbers found in form values: %+v", formValues) log.Printf("No job numbers found in form values")
http.Error(w, "No job numbers provided", http.StatusBadRequest) http.Error(w, "No job numbers provided", http.StatusBadRequest)
return return
} }
@ -282,53 +227,105 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
// Second pass: enrich file metadata with document type and custom name // Store file metadata
log.Printf("Second pass - enriching file metadata with document types and custom names") type FileMetadata struct {
for i, metadata := range fileMetadata { FormField string
suffix := strings.TrimPrefix(metadata.FormField, "document-file-") FileName string
nameField := "document-name-" + suffix Type string
CustomName string
TempFile string // Path to temp file if we create one
FileData []byte // File data if small enough to keep in memory
File *os.File // Open file handle if we're using a temp file
}
var filesToUpload []FileMetadata
// Process each file upload in the form
for formField, fileHeaders := range r.MultipartForm.File {
if !strings.HasPrefix(formField, "document-file-") {
continue
}
if len(fileHeaders) == 0 {
continue
}
fileHeader := fileHeaders[0]
if fileHeader.Filename == "" {
continue
}
// Get suffix for related form fields
suffix := strings.TrimPrefix(formField, "document-file-")
typeField := "document-type-" + suffix typeField := "document-type-" + suffix
nameField := "document-name-" + suffix
// Get custom document name if provided // Get document type and custom name
customName := formValues[nameField] docType := r.FormValue(typeField)
if customName != "" { if docType == "" {
// If a custom name is provided without extension, add the original file extension log.Printf("No document type for file %s, skipping", fileHeader.Filename)
if !strings.Contains(customName, ".") { continue
extension := filepath.Ext(metadata.FileName) }
if extension != "" {
customName = customName + extension customName := r.FormValue(nameField)
} if customName != "" && !strings.Contains(customName, ".") {
ext := filepath.Ext(fileHeader.Filename)
if ext != "" {
customName = customName + ext
} }
fileMetadata[i].CustomName = customName
} }
// Get document type // Open the uploaded file
docType := formValues[typeField] uploadedFile, err := fileHeader.Open()
if docType == "" { if err != nil {
log.Printf("No document type for file %s, skipping", metadata.FileName) log.Printf("Error opening uploaded file %s: %v", fileHeader.Filename, err)
continue continue
} }
fileMetadata[i].Type = docType // Prepare metadata
log.Printf("Enriched metadata: %s (type: %s, custom name: %s)", metadata := FileMetadata{
metadata.FileName, docType, fileMetadata[i].CustomName) FormField: formField,
} FileName: fileHeader.Filename,
Type: docType,
CustomName: customName,
}
// Filter out files with no type // Create a temp file for the upload (regardless of size to ensure streaming)
var validFiles []FileMetadata tempFile, err := os.CreateTemp("", "upload-*"+filepath.Ext(fileHeader.Filename))
for _, metadata := range fileMetadata { if err != nil {
if metadata.Type != "" { log.Printf("Error creating temp file for %s: %v", fileHeader.Filename, err)
validFiles = append(validFiles, metadata) uploadedFile.Close()
continue
} }
// Copy the file content to the temp file
bytesCopied, err := io.Copy(tempFile, uploadedFile)
uploadedFile.Close()
if err != nil {
log.Printf("Error copying to temp file for %s: %v", fileHeader.Filename, err)
tempFile.Close()
os.Remove(tempFile.Name())
continue
}
log.Printf("Copied %d bytes of %s to temporary file: %s",
bytesCopied, fileHeader.Filename, tempFile.Name())
// Seek back to beginning for later reading
tempFile.Seek(0, 0)
metadata.TempFile = tempFile.Name()
metadata.File = tempFile // Store the open file handle
filesToUpload = append(filesToUpload, metadata)
} }
fileMetadata = validFiles
log.Printf("Total valid files to upload: %d", len(fileMetadata)) if len(filesToUpload) == 0 {
if len(fileMetadata) == 0 {
http.Error(w, "No valid documents selected for upload", http.StatusBadRequest) http.Error(w, "No valid documents selected for upload", http.StatusBadRequest)
return return
} }
log.Printf("Total valid files to upload: %d", len(filesToUpload))
// Concurrent upload with throttling // Concurrent upload with throttling
// ServiceTrade API allows 30s of availability per minute (approximately 15 requests at 2s each) // ServiceTrade API allows 30s of availability per minute (approximately 15 requests at 2s each)
const maxConcurrent = 5 // A conservative limit to avoid rate limiting const maxConcurrent = 5 // A conservative limit to avoid rate limiting
@ -344,7 +341,7 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
FileSize int64 FileSize int64
} }
totalUploads := len(jobs) * len(fileMetadata) totalUploads := len(jobs) * len(filesToUpload)
resultsChan := make(chan UploadResult, totalUploads) resultsChan := make(chan UploadResult, totalUploads)
// Create a wait group to track when all uploads are done // Create a wait group to track when all uploads are done
@ -353,12 +350,12 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
// Create a semaphore channel to limit concurrent uploads // Create a semaphore channel to limit concurrent uploads
semaphore := make(chan struct{}, maxConcurrent) semaphore := make(chan struct{}, maxConcurrent)
// Third pass: Start the upload workers with proper streaming // Start the upload workers
log.Printf("Third pass - starting %d upload workers for %d total uploads", log.Printf("Starting %d upload workers for %d total uploads",
maxConcurrent, totalUploads) maxConcurrent, totalUploads)
for _, jobID := range jobs { for _, jobID := range jobs {
for _, metadata := range fileMetadata { for _, metadata := range filesToUpload {
wg.Add(1) wg.Add(1)
// Launch a goroutine for each job+document combination // Launch a goroutine for each job+document combination
@ -378,12 +375,10 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
fileName = metadata.CustomName fileName = metadata.CustomName
} }
// Create a new multipart reader for each upload to get the file part // Create a fresh file reader for each upload to avoid sharing file handles
log.Printf("Worker starting upload of %s to job %s", fileName, jobID) fileHandle, err := os.Open(metadata.TempFile)
fileReader, _, err := getFileReaderForUpload(r, metadata.FormField)
if err != nil { if err != nil {
log.Printf("Error creating file reader for %s: %v", fileName, err) log.Printf("Error opening temp file for %s: %v", fileName, err)
resultsChan <- UploadResult{ resultsChan <- UploadResult{
JobID: jobID, JobID: jobID,
DocName: fileName, DocName: fileName,
@ -393,15 +388,27 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
} }
return return
} }
defer fileHandle.Close() // Close this handle when done with this upload
// Get the expected file size for validation
fileInfo, statErr := os.Stat(metadata.TempFile)
var expectedSize int64
if statErr == nil {
expectedSize = fileInfo.Size()
}
// Create a size tracking reader // Add jitter delay for large batch uploads (more than 10 jobs)
sizeTracker, ok := fileReader.(*readCloserWithSize) if len(jobs) > 10 {
if !ok { jitter := time.Duration(100+(time.Now().UnixNano()%400)) * time.Millisecond
log.Printf("Warning: fileReader is not a readCloserWithSize, size will not be tracked") time.Sleep(jitter)
} }
// Wrap with size tracker
sizeTracker := &readCloserWithSize{reader: fileHandle, size: 0}
fileReader := sizeTracker
// Log streaming progress // Log streaming progress
log.Printf("Starting to stream file %s to job %s", fileName, jobID) log.Printf("Starting to stream file %s to job %s from fresh file handle", fileName, jobID)
// Call ServiceTrade API with the file reader // Call ServiceTrade API with the file reader
uploadStart := time.Now() uploadStart := time.Now()
@ -414,6 +421,14 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
fileSize = sizeTracker.Size() fileSize = sizeTracker.Size()
} }
// Verify the upload size matches the expected file size
sizeMatch := true
if expectedSize > 0 && math.Abs(float64(expectedSize-fileSize)) > float64(expectedSize)*0.05 {
sizeMatch = false
log.Printf("WARNING: Size mismatch for %s to job %s. Expected: %d, Uploaded: %d",
fileName, jobID, expectedSize, fileSize)
}
if err != nil { if err != nil {
log.Printf("Error uploading %s to job %s after %v: %v", log.Printf("Error uploading %s to job %s after %v: %v",
fileName, jobID, uploadDuration, err) fileName, jobID, uploadDuration, err)
@ -424,6 +439,17 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
Error: err.Error(), Error: err.Error(),
FileSize: fileSize, FileSize: fileSize,
} }
} else if !sizeMatch {
// API returned success, but we detected size mismatch
log.Printf("Corrupted upload of %s to job %s detected. API returned success but file sizes don't match.",
fileName, jobID)
resultsChan <- UploadResult{
JobID: jobID,
DocName: fileName,
Success: false,
Error: "Upload appears corrupted (file size mismatch)",
FileSize: fileSize,
}
} else { } else {
log.Printf("Successfully uploaded %s (%.2f MB) to job %s in %v", log.Printf("Successfully uploaded %s (%.2f MB) to job %s in %v",
fileName, float64(fileSize)/(1024*1024), jobID, uploadDuration) fileName, float64(fileSize)/(1024*1024), jobID, uploadDuration)
@ -439,6 +465,19 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
// Clean up temp files when all uploads are done
defer func() {
for _, metadata := range filesToUpload {
if metadata.File != nil {
metadata.File.Close()
if metadata.TempFile != "" {
os.Remove(metadata.TempFile)
log.Printf("Cleaned up temp file: %s", metadata.TempFile)
}
}
}
}()
// Close the results channel when all uploads are done // Close the results channel when all uploads are done
go func() { go func() {
wg.Wait() wg.Wait()
@ -591,35 +630,6 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
w.Write(resultHTML.Bytes()) w.Write(resultHTML.Bytes())
} }
// getFileReaderForUpload re-parses the multipart request to get a reader just for a specific file
// This allows streaming the file directly from the HTTP request to the API call
func getFileReaderForUpload(r *http.Request, formField string) (io.Reader, int64, error) {
mr, err := r.MultipartReader()
if err != nil {
return nil, 0, fmt.Errorf("error getting multipart reader: %w", err)
}
// Iterate through parts to find our file
for {
part, err := mr.NextPart()
if err == io.EOF {
break
}
if err != nil {
return nil, 0, fmt.Errorf("error reading multipart part: %w", err)
}
// Check if this is the file we're looking for
if part.FormName() == formField && part.FileName() != "" {
// We found our file, create a reader that tracks size
sizeTracker := &readCloserWithSize{reader: part}
return sizeTracker, 0, nil
}
}
return nil, 0, fmt.Errorf("file part %s not found in multipart form", formField)
}
// readCloserWithSize is a custom io.Reader that counts the bytes read // readCloserWithSize is a custom io.Reader that counts the bytes read
type readCloserWithSize struct { type readCloserWithSize struct {
reader io.ReadCloser reader io.ReadCloser

Loading…
Cancel
Save