Browse Source

fix: streaming file data instead of loading full file into ram

document-upload-removal-layout-update
nic 10 months ago
parent
commit
e0f10d8842
  1. 142
      internal/api/attachments.go
  2. 262
      internal/handlers/web/documents.go

142
internal/api/attachments.go

@ -338,3 +338,145 @@ func (s *Session) GetJobAttachments(jobID string) ([]map[string]interface{}, err
return result.Attachments, nil
}
// UploadAttachmentFile uploads a file as an attachment to a job using streaming (for large files)
// Instead of loading the entire file into memory, this method streams the file directly from the reader
func (s *Session) UploadAttachmentFile(jobID, filename, purpose string, fileReader io.Reader) (map[string]interface{}, error) {
url := fmt.Sprintf("%s/attachment", BaseURL)
// Create a pipe for streaming the multipart data
pr, pw := io.Pipe()
// Create a multipart writer
writer := multipart.NewWriter(pw)
// Start a goroutine to write the multipart form
go func() {
defer pw.Close() // Make sure to close the writer to signal end of the form
var formError error
// Log received values
log.Printf("Streaming attachment upload to job ID: %s", jobID)
log.Printf("Filename: %s", filename)
log.Printf("Purpose value: '%s'", purpose)
// The ServiceTrade API expects the purpose ID as an integer
purposeStr := strings.TrimSpace(purpose)
// Try to parse the purpose as an integer, removing any leading zeros first
purposeStr = strings.TrimLeft(purposeStr, "0")
if purposeStr == "" {
purposeStr = "0" // If only zeros were provided
}
purposeInt, err := strconv.Atoi(purposeStr)
if err != nil {
formError = fmt.Errorf("invalid purpose value '%s': must be a valid integer: %v", purpose, err)
pw.CloseWithError(formError)
return
}
log.Printf("Using purpose value: %d for job: %s", purposeInt, jobID)
// Add the purposeId (attachment type) as an integer
if err := writer.WriteField("purposeId", fmt.Sprintf("%d", purposeInt)); err != nil {
formError = fmt.Errorf("error writing purposeId field: %v", err)
pw.CloseWithError(formError)
return
}
// Add the entityType (3 for Job) and entityId (jobID)
if err := writer.WriteField("entityType", "3"); err != nil { // 3 = Job
formError = fmt.Errorf("error writing entityType field: %v", err)
pw.CloseWithError(formError)
return
}
if err := writer.WriteField("entityId", jobID); err != nil {
formError = fmt.Errorf("error writing entityId field: %v", err)
pw.CloseWithError(formError)
return
}
// Add a description field with the filename for better identification
if err := writer.WriteField("description", filename); err != nil {
formError = fmt.Errorf("error writing description field: %v", err)
pw.CloseWithError(formError)
return
}
// Check that filename has an extension
if !strings.Contains(filename, ".") {
formError = fmt.Errorf("filename must include an extension (e.g. .pdf, .docx) for API content type detection")
pw.CloseWithError(formError)
return
}
// Create the form file field
part, err := writer.CreateFormFile("uploadedFile", filename)
if err != nil {
formError = fmt.Errorf("error creating form file: %v", err)
pw.CloseWithError(formError)
return
}
// Stream the file content directly from the reader to the form
bytesWritten, err := io.Copy(part, fileReader)
if err != nil {
formError = fmt.Errorf("error copying file content: %v", err)
pw.CloseWithError(formError)
return
}
log.Printf("Streamed %d bytes to multipart form", bytesWritten)
// Close the writer to finish the multipart message
if err := writer.Close(); err != nil {
formError = fmt.Errorf("error closing multipart writer: %v", err)
pw.CloseWithError(formError)
return
}
log.Printf("Multipart form completed successfully")
}()
// Create the request with the pipe reader as the body
req, err := http.NewRequest("POST", url, pr)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}
// Set headers
req.Header.Set("Content-Type", writer.FormDataContentType())
req.Header.Set("Cookie", s.Cookie)
// Don't set Content-Length since we're streaming and don't know the size in advance
// Send the request and wait for the response
resp, err := s.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
defer resp.Body.Close()
// Read the response
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}
// Check for errors
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
log.Printf("API error response: %s - %s", resp.Status, string(body))
return nil, fmt.Errorf("API returned error: %s - %s", resp.Status, string(body))
}
// Parse the response
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("error parsing response: %v", err)
}
log.Printf("Successfully uploaded streaming attachment %s to job %s", filename, jobID)
return result, nil
}

262
internal/handlers/web/documents.go

@ -195,43 +195,53 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
return
}
// Custom multipart form processing for 32-bit systems
log.Printf("Starting document upload handler with Content-Length: %.2f MB",
float64(r.ContentLength)/(1024*1024))
// Custom multipart form processing that properly streams files
reader, err := r.MultipartReader()
if err != nil {
http.Error(w, "Unable to get multipart reader: "+err.Error(), http.StatusBadRequest)
log.Printf("Error getting MultipartReader: %v", err)
http.Error(w, "Unable to process multipart form: "+err.Error(), http.StatusBadRequest)
return
}
// Store form values and file parts
// Store form values
formValues := make(map[string]string)
// Read all file contents
type DocumentWithContent struct {
Name string
Type string
FileContent []byte
FormField string // Store the original form field name
// Store file metadata for later processing
type FileMetadata struct {
FormField string
FileName string
FileSize int64
BoundaryID string
Type string // Will be set later from form values
CustomName string // Will be set later from form values
}
var docsWithContent []DocumentWithContent
var fileMetadata []FileMetadata
// First pass: collect all form fields and files
log.Printf("--- Starting multipart form processing ---")
// First pass: read form fields only (not file contents)
log.Printf("First pass - collecting form fields and file metadata")
partIndex := 0
for {
part, err := reader.NextPart()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Error reading multipart part: %v", err)
break
log.Printf("Error reading part %d: %v", partIndex, err)
http.Error(w, "Error reading multipart form: "+err.Error(), http.StatusBadRequest)
return
}
partIndex++
// Get the form field name and file name (if any)
formName := part.FormName()
fileName := part.FileName()
// If not a file, it's a regular form value
// If not a file, read the value
if fileName == "" {
// Read the form field value
// It's a regular form field, not a file
valueBytes, err := io.ReadAll(part)
if err != nil {
log.Printf("Error reading form field %s: %v", formName, err)
@ -241,23 +251,15 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
formValues[formName] = value
log.Printf("Form field: %s = %s", formName, value)
} else if strings.HasPrefix(formName, "document-file-") {
// It's a file upload field
// Read file content
fileContent, err := io.ReadAll(part)
if err != nil {
log.Printf("Error reading file content for %s: %v", fileName, err)
continue
}
log.Printf("Found file: %s (size: %d bytes) in field: %s",
fileName, len(fileContent), formName)
// Store the file with its original field name for later processing
docsWithContent = append(docsWithContent, DocumentWithContent{
Name: fileName, // Default to original filename, will be updated with form values
Type: "", // Will be set from form values
FileContent: fileContent,
FormField: formName,
// It's a file field, but don't read the content yet
// Just store metadata for later processing
log.Printf("Found file field: %s, filename: %s", formName, fileName)
// Use the http.DetectContentType function later when we stream the file
fileMetadata = append(fileMetadata, FileMetadata{
FormField: formName,
FileName: fileName,
BoundaryID: part.FormName(),
})
}
}
@ -274,17 +276,16 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
}
log.Printf("Job numbers: %s", jobNumbers)
// Split the job numbers
jobs := strings.Split(jobNumbers, ",")
if len(jobs) == 0 {
http.Error(w, "No valid job numbers provided", http.StatusBadRequest)
return
}
// Second pass: process document metadata
for i, doc := range docsWithContent {
suffix := strings.TrimPrefix(doc.FormField, "document-file-")
// Second pass: enrich file metadata with document type and custom name
log.Printf("Second pass - enriching file metadata with document types and custom names")
for i, metadata := range fileMetadata {
suffix := strings.TrimPrefix(metadata.FormField, "document-file-")
nameField := "document-name-" + suffix
typeField := "document-type-" + suffix
@ -293,38 +294,37 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
if customName != "" {
// If a custom name is provided without extension, add the original file extension
if !strings.Contains(customName, ".") {
extension := filepath.Ext(doc.Name)
extension := filepath.Ext(metadata.FileName)
if extension != "" {
customName = customName + extension
}
}
docsWithContent[i].Name = customName
fileMetadata[i].CustomName = customName
}
// Get document type
docType := formValues[typeField]
if docType == "" {
log.Printf("No document type for file %s, skipping", doc.Name)
log.Printf("No document type for file %s, skipping", metadata.FileName)
continue
}
docsWithContent[i].Type = docType
log.Printf("Processing document: %s (type: %s) from field: %s",
docsWithContent[i].Name, docType, doc.FormField)
fileMetadata[i].Type = docType
log.Printf("Enriched metadata: %s (type: %s, custom name: %s)",
metadata.FileName, docType, fileMetadata[i].CustomName)
}
// Filter out documents with no type
var validDocs []DocumentWithContent
for _, doc := range docsWithContent {
if doc.Type != "" {
validDocs = append(validDocs, doc)
// Filter out files with no type
var validFiles []FileMetadata
for _, metadata := range fileMetadata {
if metadata.Type != "" {
validFiles = append(validFiles, metadata)
}
}
docsWithContent = validDocs
log.Printf("Total valid documents to upload: %d", len(docsWithContent))
fileMetadata = validFiles
if len(docsWithContent) == 0 {
log.Printf("Total valid files to upload: %d", len(fileMetadata))
if len(fileMetadata) == 0 {
http.Error(w, "No valid documents selected for upload", http.StatusBadRequest)
return
}
@ -336,14 +336,15 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
// Channel for collecting results
type UploadResult struct {
JobID string
DocName string
Success bool
Error string
Data map[string]interface{}
JobID string
DocName string
Success bool
Error string
Data map[string]interface{}
FileSize int64
}
totalUploads := len(jobs) * len(docsWithContent)
totalUploads := len(jobs) * len(fileMetadata)
resultsChan := make(chan UploadResult, totalUploads)
// Create a wait group to track when all uploads are done
@ -352,13 +353,16 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
// Create a semaphore channel to limit concurrent uploads
semaphore := make(chan struct{}, maxConcurrent)
// Start the upload workers
// Third pass: Start the upload workers with proper streaming
log.Printf("Third pass - starting %d upload workers for %d total uploads",
maxConcurrent, totalUploads)
for _, jobID := range jobs {
for _, doc := range docsWithContent {
for _, metadata := range fileMetadata {
wg.Add(1)
// Launch a goroutine for each job+document combination
go func(jobID string, doc DocumentWithContent) {
go func(jobID string, metadata FileMetadata) {
defer wg.Done()
// Acquire a semaphore slot
@ -368,27 +372,70 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
// Add a small delay to avoid overwhelming the API
time.Sleep(requestDelay)
// Call the ServiceTrade API
result, err := session.UploadAttachment(jobID, doc.Name, doc.Type, doc.FileContent)
// Get the file name to use (custom name or original)
fileName := metadata.FileName
if metadata.CustomName != "" {
fileName = metadata.CustomName
}
// Create a new multipart reader for each upload to get the file part
log.Printf("Worker starting upload of %s to job %s", fileName, jobID)
fileReader, _, err := getFileReaderForUpload(r, metadata.FormField)
if err != nil {
log.Printf("Error uploading %s to job %s: %v", doc.Name, jobID, err)
log.Printf("Error creating file reader for %s: %v", fileName, err)
resultsChan <- UploadResult{
JobID: jobID,
DocName: doc.Name,
Success: false,
Error: err.Error(),
JobID: jobID,
DocName: fileName,
Success: false,
Error: fmt.Sprintf("Error preparing file: %v", err),
FileSize: 0,
}
return
}
// Create a size tracking reader
sizeTracker, ok := fileReader.(*readCloserWithSize)
if !ok {
log.Printf("Warning: fileReader is not a readCloserWithSize, size will not be tracked")
}
// Log streaming progress
log.Printf("Starting to stream file %s to job %s", fileName, jobID)
// Call ServiceTrade API with the file reader
uploadStart := time.Now()
result, err := session.UploadAttachmentFile(jobID, fileName, metadata.Type, fileReader)
uploadDuration := time.Since(uploadStart)
// Get the actual size that was uploaded
var fileSize int64
if sizeTracker != nil {
fileSize = sizeTracker.Size()
}
if err != nil {
log.Printf("Error uploading %s to job %s after %v: %v",
fileName, jobID, uploadDuration, err)
resultsChan <- UploadResult{
JobID: jobID,
DocName: fileName,
Success: false,
Error: err.Error(),
FileSize: fileSize,
}
} else {
log.Printf("Successfully uploaded %s to job %s", doc.Name, jobID)
log.Printf("Successfully uploaded %s (%.2f MB) to job %s in %v",
fileName, float64(fileSize)/(1024*1024), jobID, uploadDuration)
resultsChan <- UploadResult{
JobID: jobID,
DocName: doc.Name,
Success: true,
Data: result,
JobID: jobID,
DocName: fileName,
Success: true,
Data: result,
FileSize: fileSize,
}
}
}(jobID, doc)
}(jobID, metadata)
}
}
@ -400,13 +447,28 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
// Collect results
results := make(map[string][]UploadResult)
resultsCount := 0
var totalBytesUploaded int64
for result := range resultsChan {
resultsCount++
log.Printf("Received result %d/%d: Job %s, File %s, Success: %v, Size: %.2f MB",
resultsCount, totalUploads, result.JobID, result.DocName, result.Success,
float64(result.FileSize)/(1024*1024))
if result.Success {
totalBytesUploaded += result.FileSize
}
if _, exists := results[result.JobID]; !exists {
results[result.JobID] = []UploadResult{}
}
results[result.JobID] = append(results[result.JobID], result)
}
log.Printf("All results collected. Total: %d, Total bytes uploaded: %.2f MB",
resultsCount, float64(totalBytesUploaded)/(1024*1024))
// Generate HTML for results
var resultHTML bytes.Buffer
@ -529,6 +591,56 @@ func UploadDocumentsHandler(w http.ResponseWriter, r *http.Request) {
w.Write(resultHTML.Bytes())
}
// getFileReaderForUpload re-parses the multipart request to get a reader just for a specific file
// This allows streaming the file directly from the HTTP request to the API call
func getFileReaderForUpload(r *http.Request, formField string) (io.Reader, int64, error) {
mr, err := r.MultipartReader()
if err != nil {
return nil, 0, fmt.Errorf("error getting multipart reader: %w", err)
}
// Iterate through parts to find our file
for {
part, err := mr.NextPart()
if err == io.EOF {
break
}
if err != nil {
return nil, 0, fmt.Errorf("error reading multipart part: %w", err)
}
// Check if this is the file we're looking for
if part.FormName() == formField && part.FileName() != "" {
// We found our file, create a reader that tracks size
sizeTracker := &readCloserWithSize{reader: part}
return sizeTracker, 0, nil
}
}
return nil, 0, fmt.Errorf("file part %s not found in multipart form", formField)
}
// readCloserWithSize is a custom io.Reader that counts the bytes read
type readCloserWithSize struct {
reader io.ReadCloser
size int64
}
func (r *readCloserWithSize) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
r.size += int64(n)
return n, err
}
func (r *readCloserWithSize) Close() error {
return r.reader.Close()
}
// Size returns the current size of data read
func (r *readCloserWithSize) Size() int64 {
return r.size
}
// DocumentFieldAddHandler generates a new document field for the form
func DocumentFieldAddHandler(w http.ResponseWriter, r *http.Request) {
// Generate a random ID for the new field

Loading…
Cancel
Save