diff --git a/lib/appwrite/client.rb b/lib/appwrite/client.rb index 609c746..45257b6 100644 --- a/lib/appwrite/client.rb +++ b/lib/appwrite/client.rb @@ -262,56 +262,133 @@ def chunked_upload( offset = 0 id_param_name = id_param_name.to_sym if id_param_name + upload_id = nil + chunks_uploaded = 0 if id_param_name&.empty? == false + upload_id = params[id_param_name] # Make a request to check if a file already exists - current = call( - method: "GET", - path: "#{path}/#{params[id_param_name]}", - headers: headers, - params: {} - ) - chunks_uploaded = current['chunksUploaded'].to_i - offset = chunks_uploaded * @chunk_size + begin + current = call( + method: "GET", + path: "#{path}/#{params[id_param_name]}", + headers: headers, + params: {} + ) + chunks_uploaded = current['chunksUploaded'].to_i + offset = chunks_uploaded * @chunk_size + rescue Appwrite::Exception => error + raise error unless error.code.to_i == 404 + end end + total_chunks = (size.to_f / @chunk_size).ceil + chunks = [] while offset < size + chunks << { + index: chunks_uploaded.to_i, + start: offset, + ending: [offset + @chunk_size, size].min + } + offset += @chunk_size + chunks_uploaded = chunks_uploaded.to_i + 1 + end + + result = current if defined?(current) + return result unless chunks.any? + + upload_chunk = lambda do |chunk, current_upload_id| case input_file.source_type when 'path' - string = IO.read(input_file.path, @chunk_size, offset) + string = IO.read(input_file.path, chunk[:ending] - chunk[:start], chunk[:start]) when 'string' - string = input_file.data.byteslice(offset, [@chunk_size, size - offset].min) + string = input_file.data.byteslice(chunk[:start], chunk[:ending] - chunk[:start]) end - params[param_name.to_sym] = InputFile::from_string( + chunk_params = params.merge(param_name.to_sym => InputFile::from_string( string, filename: input_file.filename, mime_type: input_file.mime_type - ) + )) - headers['content-range'] = "bytes #{offset}-#{[offset + @chunk_size - 1, size - 1].min}/#{size}" + chunk_headers = headers.merge('content-range' => "bytes #{chunk[:start]}-#{chunk[:ending] - 1}/#{size}") + chunk_headers['x-appwrite-id'] = current_upload_id if current_upload_id - result = call( + call( method: 'POST', path: path, - headers: headers, - params: params, + headers: chunk_headers, + params: chunk_params, ) + end - offset += @chunk_size + result = upload_chunk.call(chunks.first, upload_id) + upload_id = result['$id'] if result['$id'] + completed_count = chunks.first[:index] + 1 + uploaded_size = chunks.first[:ending] - if defined? result['$id'] - headers['x-appwrite-id'] = result['$id'] - end + upload_complete = lambda do |chunk_result| + chunks_uploaded = chunk_result['chunksUploaded'] + return false if chunks_uploaded.nil? - on_progress.call({ - id: result['$id'], - progress: ([offset, size].min).to_f/size.to_f * 100.0, - size_uploaded: [offset, size].min, - chunks_total: result['chunksTotal'], - chunks_uploaded: result['chunksUploaded'] - }) unless on_progress.nil? + chunks_total = chunk_result['chunksTotal'] || total_chunks + chunks_uploaded.to_i >= chunks_total.to_i end + on_progress.call({ + id: result['$id'], + progress: uploaded_size.to_f/size.to_f * 100.0, + size_uploaded: uploaded_size, + chunks_total: result['chunksTotal'] || total_chunks, + chunks_uploaded: result['chunksUploaded'] || completed_count + }) unless on_progress.nil? + + mutex = Mutex.new + queue = Queue.new + chunks.drop(1).each { |chunk| queue << chunk } + first_error = nil + last_result = result + completed_result = nil + + workers = [8, queue.size].min.times.map do + Thread.new do + loop do + break if mutex.synchronize { !first_error.nil? } + + chunk = begin + queue.pop(true) + rescue ThreadError + nil + end + break unless chunk + + begin + chunk_result = upload_chunk.call(chunk, upload_id) + rescue => error + mutex.synchronize { first_error ||= error } + break + end + mutex.synchronize do + completed_count += 1 + uploaded_size += chunk[:ending] - chunk[:start] + last_result = chunk_result + completed_result = chunk_result if upload_complete.call(chunk_result) + on_progress.call({ + id: upload_id, + progress: uploaded_size.to_f/size.to_f * 100.0, + size_uploaded: uploaded_size, + chunks_total: chunk_result['chunksTotal'] || total_chunks, + chunks_uploaded: chunk_result['chunksUploaded'] || completed_count + }) unless on_progress.nil? + end + end + end + end + + workers.each(&:join) + raise first_error if first_error + + result = completed_result || last_result + return result unless response_type.respond_to?("from") response_type.from(map: result) @@ -329,8 +406,8 @@ def fetch( ) raise ArgumentError, 'Too Many HTTP Redirects' if limit == 0 - @http = Net::HTTP.new(uri.host, uri.port) unless defined? @http - @http.use_ssl = !@self_signed + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = !@self_signed payload = '' headers = @headers.merge(headers) @@ -351,7 +428,7 @@ def fetch( end begin - response = @http.send_request(method, uri.request_uri, payload, headers) + response = http.send_request(method, uri.request_uri, payload, headers) rescue => error raise Appwrite::Exception.new(error.message) end @@ -382,7 +459,7 @@ def fetch( end if response.code.to_i >= 400 - raise Appwrite::Exception.new(result['message'], result['status'], result['type'], response.body) + raise Appwrite::Exception.new(result['message'], response.code, result['type'], response.body) end unless response_type.respond_to?("from")