Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 108 additions & 31 deletions lib/appwrite/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,56 +262,133 @@ def chunked_upload(

offset = 0
id_param_name = id_param_name.to_sym if id_param_name
upload_id = nil
chunks_uploaded = 0
if id_param_name&.empty? == false
upload_id = params[id_param_name]
# Make a request to check if a file already exists
current = call(
method: "GET",
path: "#{path}/#{params[id_param_name]}",
headers: headers,
params: {}
)
chunks_uploaded = current['chunksUploaded'].to_i
offset = chunks_uploaded * @chunk_size
begin
current = call(
method: "GET",
path: "#{path}/#{params[id_param_name]}",
headers: headers,
params: {}
)
chunks_uploaded = current['chunksUploaded'].to_i
offset = chunks_uploaded * @chunk_size
rescue Appwrite::Exception => error
raise error unless error.code.to_i == 404
end
end

total_chunks = (size.to_f / @chunk_size).ceil
chunks = []
while offset < size
chunks << {
index: chunks_uploaded.to_i,
start: offset,
ending: [offset + @chunk_size, size].min
}
offset += @chunk_size
chunks_uploaded = chunks_uploaded.to_i + 1
end

result = current if defined?(current)
return result unless chunks.any?

upload_chunk = lambda do |chunk, current_upload_id|
case input_file.source_type
when 'path'
string = IO.read(input_file.path, @chunk_size, offset)
string = IO.read(input_file.path, chunk[:ending] - chunk[:start], chunk[:start])
when 'string'
string = input_file.data.byteslice(offset, [@chunk_size, size - offset].min)
string = input_file.data.byteslice(chunk[:start], chunk[:ending] - chunk[:start])
end

params[param_name.to_sym] = InputFile::from_string(
chunk_params = params.merge(param_name.to_sym => InputFile::from_string(
string,
filename: input_file.filename,
mime_type: input_file.mime_type
)
))

headers['content-range'] = "bytes #{offset}-#{[offset + @chunk_size - 1, size - 1].min}/#{size}"
chunk_headers = headers.merge('content-range' => "bytes #{chunk[:start]}-#{chunk[:ending] - 1}/#{size}")
chunk_headers['x-appwrite-id'] = current_upload_id if current_upload_id

result = call(
call(
method: 'POST',
path: path,
headers: headers,
params: params,
headers: chunk_headers,
params: chunk_params,
)
end

offset += @chunk_size
result = upload_chunk.call(chunks.first, upload_id)
upload_id = result['$id'] if result['$id']
completed_count = chunks.first[:index] + 1
uploaded_size = chunks.first[:ending]

if defined? result['$id']
headers['x-appwrite-id'] = result['$id']
end
upload_complete = lambda do |chunk_result|
chunks_uploaded = chunk_result['chunksUploaded']
return false if chunks_uploaded.nil?

on_progress.call({
id: result['$id'],
progress: ([offset, size].min).to_f/size.to_f * 100.0,
size_uploaded: [offset, size].min,
chunks_total: result['chunksTotal'],
chunks_uploaded: result['chunksUploaded']
}) unless on_progress.nil?
chunks_total = chunk_result['chunksTotal'] || total_chunks
chunks_uploaded.to_i >= chunks_total.to_i
end

on_progress.call({
id: result['$id'],
progress: uploaded_size.to_f/size.to_f * 100.0,
size_uploaded: uploaded_size,
chunks_total: result['chunksTotal'] || total_chunks,
chunks_uploaded: result['chunksUploaded'] || completed_count
}) unless on_progress.nil?

mutex = Mutex.new
queue = Queue.new
chunks.drop(1).each { |chunk| queue << chunk }
first_error = nil
last_result = result
completed_result = nil

workers = [8, queue.size].min.times.map do
Thread.new do
loop do
break if mutex.synchronize { !first_error.nil? }

chunk = begin
queue.pop(true)
rescue ThreadError
nil
end
break unless chunk

begin
chunk_result = upload_chunk.call(chunk, upload_id)
rescue => error
mutex.synchronize { first_error ||= error }
break
end
mutex.synchronize do
completed_count += 1
uploaded_size += chunk[:ending] - chunk[:start]
last_result = chunk_result
completed_result = chunk_result if upload_complete.call(chunk_result)
on_progress.call({
id: upload_id,
progress: uploaded_size.to_f/size.to_f * 100.0,
size_uploaded: uploaded_size,
chunks_total: chunk_result['chunksTotal'] || total_chunks,
chunks_uploaded: chunk_result['chunksUploaded'] || completed_count
}) unless on_progress.nil?
end
end
end
end

workers.each(&:join)
raise first_error if first_error

result = completed_result || last_result

return result unless response_type.respond_to?("from")

response_type.from(map: result)
Expand All @@ -329,8 +406,8 @@ def fetch(
)
raise ArgumentError, 'Too Many HTTP Redirects' if limit == 0

@http = Net::HTTP.new(uri.host, uri.port) unless defined? @http
@http.use_ssl = !@self_signed
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = !@self_signed
payload = ''

headers = @headers.merge(headers)
Expand All @@ -351,7 +428,7 @@ def fetch(
end

begin
response = @http.send_request(method, uri.request_uri, payload, headers)
response = http.send_request(method, uri.request_uri, payload, headers)
rescue => error
raise Appwrite::Exception.new(error.message)
end
Expand Down Expand Up @@ -382,7 +459,7 @@ def fetch(
end

if response.code.to_i >= 400
raise Appwrite::Exception.new(result['message'], result['status'], result['type'], response.body)
raise Appwrite::Exception.new(result['message'], response.code, result['type'], response.body)
end

unless response_type.respond_to?("from")
Expand Down