diff --git a/.github/workflows/sync-bench.yml b/.github/workflows/sync-bench.yml new file mode 100644 index 0000000..2713f79 --- /dev/null +++ b/.github/workflows/sync-bench.yml @@ -0,0 +1,63 @@ +name: sync benchmark + +on: + workflow_dispatch: + inputs: + poll_delay_ms: + description: Polling delay between receiver checks, in milliseconds + required: false + default: '250' + max_polls: + description: Maximum number of receiver polling attempts + required: false + default: '40' + curl_pool: + description: Enable curl connection pool; use 0 to disable + required: false + default: '1' + +permissions: + contents: read + +jobs: + sync-bench-debug: + name: sync-bench-debug linux x86_64 + runs-on: ubuntu-22.04 + timeout-minutes: 20 + + env: + SYNC_BENCH_DATABASE_ID: ${{ secrets.SYNC_BENCH_DATABASE_ID }} + SYNC_BENCH_CLOUDSYNC_ADDRESS: ${{ secrets.SYNC_BENCH_CLOUDSYNC_ADDRESS }} + SYNC_BENCH_APIKEY: ${{ secrets.SYNC_BENCH_APIKEY }} + SYNC_BENCH_OUTPUT: json + SYNC_BENCH_POLL_DELAY_MS: ${{ inputs.poll_delay_ms }} + SYNC_BENCH_MAX_POLLS: ${{ inputs.max_polls }} + CLOUDSYNC_CURL_POOL: ${{ inputs.curl_pool }} + + steps: + - uses: actions/checkout@v4.2.2 + with: + submodules: true + + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y gcc make curl sqlite3 unzip + + - name: Build debug benchmark + run: make SYNC_BENCH_DEBUG=1 extension dist/sync_bench + + - name: Run sync benchmark + run: | + mkdir -p artifacts + ./dist/sync_bench > artifacts/sync-bench.json 2> artifacts/sync-bench.trace.log + + - name: Upload benchmark artifacts + if: always() + uses: actions/upload-artifact@v4 + with: + name: sync-bench-debug-${{ github.run_id }} + path: | + artifacts/sync-bench.json + artifacts/sync-bench.trace.log + if-no-files-found: warn diff --git a/CHANGELOG.md b/CHANGELOG.md index ea76c25..158a6ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [1.0.20] - 2026-05-26 + +### Changed + +- **Improved network sync performance** by reducing request overhead during `cloudsync_network_send_changes()`, especially for small payloads that can now be applied without the extra upload-URL round trip. +- **Improved repeated sync request latency** by allowing the network layer to reuse HTTP connections across CloudSync API calls. + +## [1.0.19] - 2026-05-15 + +### Added + +- **Mac Catalyst support**. + ## [1.0.18] - 2026-04-29 ### Fixed diff --git a/Makefile b/Makefile index 1257835..189bf0f 100644 --- a/Makefile +++ b/Makefile @@ -185,6 +185,10 @@ endif T_LDFLAGS += -fprofile-arcs -ftest-coverage endif +ifdef SYNC_BENCH_DEBUG + CFLAGS += -DCLOUDSYNC_NETWORK_TRACE +endif + # Native network support only for Apple platforms ifdef NATIVE_NETWORK RELEASE_OBJ += $(patsubst %.m, $(BUILD_RELEASE)/%_m.o, $(notdir $(wildcard $(NETWORK_DIR)/*.m))) @@ -271,6 +275,25 @@ e2e: $(TARGET) $(DIST_DIR)/integration$(EXE) fi; \ ./$(DIST_DIR)/integration$(EXE) +# Run the sync performance benchmark. This is intentionally separate from e2e +# because timings depend on network/server load and polling configuration. +sync-bench: $(TARGET) $(DIST_DIR)/sync_bench$(EXE) + @if [ -f .env ]; then \ + export $$(grep -v '^#' .env | xargs); \ + fi; \ + if [ -n "$(SYNC_BENCH_DATABASE_ID)" ]; then export SYNC_BENCH_DATABASE_ID="$(SYNC_BENCH_DATABASE_ID)"; fi; \ + if [ -n "$(SYNC_BENCH_CLOUDSYNC_ADDRESS)" ]; then export SYNC_BENCH_CLOUDSYNC_ADDRESS="$(SYNC_BENCH_CLOUDSYNC_ADDRESS)"; fi; \ + if [ -n "$(SYNC_BENCH_APIKEY)" ]; then export SYNC_BENCH_APIKEY="$(SYNC_BENCH_APIKEY)"; fi; \ + if [ -n "$(SYNC_BENCH_POLL_DELAY_MS)" ]; then export SYNC_BENCH_POLL_DELAY_MS="$(SYNC_BENCH_POLL_DELAY_MS)"; fi; \ + if [ -n "$(SYNC_BENCH_MAX_POLLS)" ]; then export SYNC_BENCH_MAX_POLLS="$(SYNC_BENCH_MAX_POLLS)"; fi; \ + if [ -n "$(SYNC_BENCH_RANDOM_BLOB_SIZE_BYTES)" ]; then export SYNC_BENCH_RANDOM_BLOB_SIZE_BYTES="$(SYNC_BENCH_RANDOM_BLOB_SIZE_BYTES)"; fi; \ + if [ -n "$(SYNC_BENCH_CLEANUP_OLDER_THAN_SECONDS)" ]; then export SYNC_BENCH_CLEANUP_OLDER_THAN_SECONDS="$(SYNC_BENCH_CLEANUP_OLDER_THAN_SECONDS)"; fi; \ + if [ -n "$(SYNC_BENCH_OUTPUT)" ]; then export SYNC_BENCH_OUTPUT="$(SYNC_BENCH_OUTPUT)"; fi; \ + ./$(DIST_DIR)/sync_bench$(EXE) + +sync-bench-debug: + $(MAKE) SYNC_BENCH_DEBUG=1 sync-bench + OPENSSL_TARBALL = $(OPENSSL_DIR)/$(OPENSSL_VERSION).tar.gz $(OPENSSL_TARBALL): diff --git a/src/cloudsync.h b/src/cloudsync.h index b6f5441..56c4d2b 100644 --- a/src/cloudsync.h +++ b/src/cloudsync.h @@ -18,7 +18,7 @@ extern "C" { #endif -#define CLOUDSYNC_VERSION "1.0.19" +#define CLOUDSYNC_VERSION "1.0.20" #define CLOUDSYNC_MAX_TABLENAME_LEN 512 #define CLOUDSYNC_VALUE_NOTSET -1 diff --git a/src/network/network.c b/src/network/network.c index 79315d0..652f96c 100644 --- a/src/network/network.c +++ b/src/network/network.c @@ -10,6 +10,16 @@ #include #include #include +#include +#include + +#ifdef CLOUDSYNC_NETWORK_TRACE +#ifdef _WIN32 +#include +#else +#include +#endif +#endif #include "network.h" #include "../utils.h" @@ -37,6 +47,19 @@ static size_t cacert_len = sizeof(cacert_pem) - 1; #define CLOUDSYNC_NETWORK_MINBUF_SIZE 512 #define CLOUDSYNC_SESSION_TOKEN_MAXSIZE 4096 +#ifndef CLOUDSYNC_CURL_MAXCONNECTS +#define CLOUDSYNC_CURL_MAXCONNECTS 2L +#endif +#ifndef CLOUDSYNC_CURL_MAXAGE_CONN_SECONDS +#define CLOUDSYNC_CURL_MAXAGE_CONN_SECONDS 15L +#endif +#ifndef CLOUDSYNC_CURL_MAXLIFETIME_CONN_SECONDS +#define CLOUDSYNC_CURL_MAXLIFETIME_CONN_SECONDS 60L +#endif +#ifndef CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE +#define CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE (128 * 1024) +#endif + #define DEFAULT_SYNC_WAIT_MS 100 #define DEFAULT_SYNC_MAX_RETRIES 1 @@ -52,12 +75,87 @@ struct network_data { char site_id[UUID_STR_MAXLEN]; char *authentication; // apikey or token char *org_id; // organization ID for X-CloudSync-Org header + char *ticket; // optional short-lived sync runtime ticket + char *ticket_expires_at; char *check_endpoint; char *upload_endpoint; char *apply_endpoint; char *status_endpoint; + int ticket_enabled; +#ifndef CLOUDSYNC_OMIT_CURL + CURL *api_curl; + CURL *artifact_curl; + int curl_pool_enabled; +#endif }; +#ifdef CLOUDSYNC_NETWORK_TRACE +double network_trace_now_ms(void) { +#ifdef _WIN32 + LARGE_INTEGER freq; + LARGE_INTEGER counter; + QueryPerformanceFrequency(&freq); + QueryPerformanceCounter(&counter); + return ((double)counter.QuadPart * 1000.0) / (double)freq.QuadPart; +#else + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ((double)ts.tv_sec * 1000.0) + ((double)ts.tv_nsec / 1000000.0); +#endif +} + +const char *network_trace_endpoint_name(network_data *data, const char *endpoint) { + if (!data || !endpoint) return "unknown"; + if (data->check_endpoint && strcmp(endpoint, data->check_endpoint) == 0) return "check"; + if (data->upload_endpoint && strcmp(endpoint, data->upload_endpoint) == 0) return "upload-url"; + if (data->apply_endpoint && strcmp(endpoint, data->apply_endpoint) == 0) return "apply"; + if (data->status_endpoint && strcmp(endpoint, data->status_endpoint) == 0) return "status"; + return "artifact"; +} + +const char *network_trace_result_name(int code) { + switch (code) { + case CLOUDSYNC_NETWORK_OK: return "ok"; + case CLOUDSYNC_NETWORK_ERROR: return "error"; + case CLOUDSYNC_NETWORK_BUFFER: return "buffer"; + default: return "unknown"; + } +} + +void network_trace_log(network_data *data, const char *method, const char *endpoint, long http_status, int result_code, size_t request_bytes, size_t bytes, double elapsed_ms) { + fprintf(stderr, + "[cloudsync-network] endpoint=%s method=%s http_status=%ld result=%s request_bytes=%zu bytes=%zu elapsed_ms=%.2f\n", + network_trace_endpoint_name(data, endpoint), method, http_status, + network_trace_result_name(result_code), request_bytes, bytes, elapsed_ms); +} + +#ifndef CLOUDSYNC_OMIT_CURL +void network_trace_log_curl(network_data *data, const char *method, const char *endpoint, long http_status, int result_code, size_t request_bytes, size_t bytes, CURL *curl, bool pooled, double elapsed_ms) { + double namelookup = 0.0; + double connect = 0.0; + double appconnect = 0.0; + double starttransfer = 0.0; + double total = 0.0; + long num_connects = 0; + if (curl) { + curl_easy_getinfo(curl, CURLINFO_NAMELOOKUP_TIME, &namelookup); + curl_easy_getinfo(curl, CURLINFO_CONNECT_TIME, &connect); + curl_easy_getinfo(curl, CURLINFO_APPCONNECT_TIME, &appconnect); + curl_easy_getinfo(curl, CURLINFO_STARTTRANSFER_TIME, &starttransfer); + curl_easy_getinfo(curl, CURLINFO_TOTAL_TIME, &total); + curl_easy_getinfo(curl, CURLINFO_NUM_CONNECTS, &num_connects); + } + fprintf(stderr, + "[cloudsync-network] endpoint=%s method=%s pool=%s http_status=%ld result=%s request_bytes=%zu bytes=%zu elapsed_ms=%.2f curl_total_ms=%.2f dns_ms=%.2f connect_ms=%.2f tls_ms=%.2f starttransfer_ms=%.2f num_connects=%ld\n", + network_trace_endpoint_name(data, endpoint), method, + pooled ? "on" : "off", http_status, + network_trace_result_name(result_code), request_bytes, bytes, elapsed_ms, + total * 1000.0, namelookup * 1000.0, connect * 1000.0, + appconnect * 1000.0, starttransfer * 1000.0, num_connects); +} +#endif +#endif + typedef struct { char *buffer; size_t balloc; @@ -72,6 +170,11 @@ typedef struct { size_t read_pos; } network_read_data; +typedef struct { + char *ticket; + char *expires_at; +} network_ticket_headers; + static const char *cloudsync_default_headers[] = { CLOUDSYNC_HEADER_VERSION_LINE, }; @@ -101,12 +204,25 @@ char *network_data_get_orgid (network_data *data) { return data->org_id; } +char *network_data_get_ticket (network_data *data) { + return data->ticket; +} + +static void network_data_clear_ticket (network_data *data) { + if (!data) return; + if (data->ticket) cloudsync_memory_free(data->ticket); + if (data->ticket_expires_at) cloudsync_memory_free(data->ticket_expires_at); + data->ticket = NULL; + data->ticket_expires_at = NULL; +} + bool network_data_set_endpoints (network_data *data, char *auth, char *check, char *upload, char *apply, char *status) { // sanity check if (!check || !upload) return false; // always free previous owned pointers if (data->authentication) cloudsync_memory_free(data->authentication); + network_data_clear_ticket(data); if (data->check_endpoint) cloudsync_memory_free(data->check_endpoint); if (data->upload_endpoint) cloudsync_memory_free(data->upload_endpoint); if (data->apply_endpoint) cloudsync_memory_free(data->apply_endpoint); @@ -162,8 +278,13 @@ bool network_data_set_endpoints (network_data *data, char *auth, char *check, ch void network_data_free (network_data *data) { if (!data) return; +#ifndef CLOUDSYNC_OMIT_CURL + if (data->api_curl) curl_easy_cleanup(data->api_curl); + if (data->artifact_curl) curl_easy_cleanup(data->artifact_curl); +#endif if (data->authentication) cloudsync_memory_free(data->authentication); if (data->org_id) cloudsync_memory_free(data->org_id); + network_data_clear_ticket(data); if (data->check_endpoint) cloudsync_memory_free(data->check_endpoint); if (data->upload_endpoint) cloudsync_memory_free(data->upload_endpoint); if (data->apply_endpoint) cloudsync_memory_free(data->apply_endpoint); @@ -173,7 +294,89 @@ void network_data_free (network_data *data) { // MARK: - Utils - +static bool network_endpoint_is_api(network_data *data, const char *endpoint) { + if (!data || !endpoint) return false; + return (data->check_endpoint && strcmp(endpoint, data->check_endpoint) == 0) || + (data->upload_endpoint && strcmp(endpoint, data->upload_endpoint) == 0) || + (data->apply_endpoint && strcmp(endpoint, data->apply_endpoint) == 0) || + (data->status_endpoint && strcmp(endpoint, data->status_endpoint) == 0); +} + +static bool network_env_disabled(const char *value) { + return value && (strcmp(value, "0") == 0 || strcmp(value, "false") == 0 || strcmp(value, "off") == 0 || strcmp(value, "no") == 0); +} + +static bool network_ticket_enabled(network_data *data) { + if (!data) return false; + if (data->ticket_enabled == 0) { + const char *value = getenv("CLOUDSYNC_NETWORK_TICKET"); + data->ticket_enabled = network_env_disabled(value) ? -1 : 1; + } + return data->ticket_enabled > 0; +} + +bool network_data_should_use_ticket (network_data *data, const char *endpoint, const char *authentication) { + return data && authentication && authentication[0] != '\0' && data->ticket && data->ticket[0] != '\0' && + network_ticket_enabled(data) && network_endpoint_is_api(data, endpoint); +} + +void network_data_update_ticket (network_data *data, const char *ticket, const char *expires_at) { + if (!data || !ticket || ticket[0] == '\0') return; + + char *ticket_copy = cloudsync_string_dup(ticket); + if (!ticket_copy) return; + + char *expires_copy = NULL; + if (expires_at && expires_at[0] != '\0') { + expires_copy = cloudsync_string_dup(expires_at); + if (!expires_copy) { + cloudsync_memory_free(ticket_copy); + return; + } + } + + network_data_clear_ticket(data); + data->ticket = ticket_copy; + data->ticket_expires_at = expires_copy; + +#ifdef CLOUDSYNC_NETWORK_TRACE + fprintf(stderr, + "[cloudsync-network] received_ticket=%s expires_at=%s\n", + data->ticket ? "true" : "false", data->ticket_expires_at ? data->ticket_expires_at : ""); +#endif +} + #ifndef CLOUDSYNC_OMIT_CURL +static bool network_curl_pool_enabled(network_data *data) { + if (!data) return false; + if (data->curl_pool_enabled == 0) { + const char *value = getenv("CLOUDSYNC_CURL_POOL"); + data->curl_pool_enabled = network_env_disabled(value) ? -1 : 1; + } + return data->curl_pool_enabled > 0; +} + +static CURL *network_curl_for_endpoint(network_data *data, const char *endpoint, bool *pooled) { + if (pooled) *pooled = false; + if (!network_curl_pool_enabled(data)) { + return curl_easy_init(); + } + + CURL **slot = network_endpoint_is_api(data, endpoint) ? &data->api_curl : &data->artifact_curl; + if (!*slot) { + *slot = curl_easy_init(); + } else { + curl_easy_reset(*slot); + } + if (!*slot) return NULL; + + curl_easy_setopt(*slot, CURLOPT_MAXCONNECTS, CLOUDSYNC_CURL_MAXCONNECTS); + curl_easy_setopt(*slot, CURLOPT_MAXAGE_CONN, CLOUDSYNC_CURL_MAXAGE_CONN_SECONDS); + curl_easy_setopt(*slot, CURLOPT_MAXLIFETIME_CONN, CLOUDSYNC_CURL_MAXLIFETIME_CONN_SECONDS); + if (pooled) *pooled = true; + return *slot; +} + static bool network_buffer_check (network_buffer *data, size_t needed) { // alloc/resize buffer if (data->bused + needed > data->balloc) { @@ -204,14 +407,71 @@ static size_t network_receive_callback (void *ptr, size_t size, size_t nmemb, vo return (size * nmemb); } +static bool network_header_eq(const char *line, size_t len, const char *name) { + size_t name_len = strlen(name); + if (len <= name_len || line[name_len] != ':') return false; + for (size_t i = 0; i < name_len; i++) { + if (tolower((unsigned char)line[i]) != tolower((unsigned char)name[i])) return false; + } + return true; +} + +static char *network_header_value_dup(const char *line, size_t len, const char *name) { + size_t name_len = strlen(name); + const char *start = line + name_len + 1; + const char *end = line + len; + + while (start < end && (*start == ' ' || *start == '\t')) start++; + while (end > start && (end[-1] == '\r' || end[-1] == '\n' || end[-1] == ' ' || end[-1] == '\t')) end--; + + size_t value_len = (size_t)(end - start); + char *value = cloudsync_memory_zeroalloc(value_len + 1); + if (!value) return NULL; + memcpy(value, start, value_len); + value[value_len] = '\0'; + return value; +} + +static size_t network_header_callback(char *buffer, size_t size, size_t nitems, void *userdata) { + network_ticket_headers *ticket_headers = (network_ticket_headers *)userdata; + size_t len = size * nitems; + + if (network_header_eq(buffer, len, CLOUDSYNC_HEADER_TICKET)) { + char *ticket = network_header_value_dup(buffer, len, CLOUDSYNC_HEADER_TICKET); + if (ticket) { + if (ticket_headers->ticket) cloudsync_memory_free(ticket_headers->ticket); + ticket_headers->ticket = ticket; + } + } else if (network_header_eq(buffer, len, CLOUDSYNC_HEADER_TICKET_EXPIRES_AT)) { + char *expires_at = network_header_value_dup(buffer, len, CLOUDSYNC_HEADER_TICKET_EXPIRES_AT); + if (expires_at) { + if (ticket_headers->expires_at) cloudsync_memory_free(ticket_headers->expires_at); + ticket_headers->expires_at = expires_at; + } + } + + return len; +} + NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, const char *authentication, bool zero_terminated, bool is_post_request, char *json_payload, const char **extra_headers, int nextra_headers) { char *buffer = NULL; size_t blen = 0; struct curl_slist* headers = NULL; + network_ticket_headers ticket_headers = {NULL, NULL}; char errbuf[CURL_ERROR_SIZE] = {0}; long response_code = 0; + bool pooled = false; + bool using_ticket = network_data_should_use_ticket(data, endpoint, authentication); + const char *method = (json_payload || is_post_request) ? "POST" : "GET"; +#ifndef CLOUDSYNC_NETWORK_TRACE + (void)method; +#endif +#ifdef CLOUDSYNC_NETWORK_TRACE + double trace_start_ms = network_trace_now_ms(); + size_t request_bytes = json_payload ? strlen(json_payload) : 0; +#endif - CURL *curl = curl_easy_init(); + CURL *curl = network_curl_for_endpoint(data, endpoint, &pooled); if (!curl) return (NETWORK_RESULT){CLOUDSYNC_NETWORK_ERROR, NULL, 0, NULL, NULL}; // a buffer to store errors in @@ -256,12 +516,21 @@ NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, if (!tmp) {rc = CURLE_OUT_OF_MEMORY; goto cleanup;} headers = tmp; } + if (using_ticket) { + char ticket_header[CLOUDSYNC_SESSION_TOKEN_MAXSIZE]; + snprintf(ticket_header, sizeof(ticket_header), "%s: %s", CLOUDSYNC_HEADER_TICKET, data->ticket); + struct curl_slist *tmp = curl_slist_append(headers, ticket_header); + if (!tmp) {rc = CURLE_OUT_OF_MEMORY; goto cleanup;} + headers = tmp; + } if (headers) curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); network_buffer netdata = {NULL, 0, 0, (zero_terminated) ? 1 : 0}; curl_easy_setopt(curl, CURLOPT_WRITEDATA, &netdata); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, network_receive_callback); + curl_easy_setopt(curl, CURLOPT_HEADERDATA, &ticket_headers); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, network_header_callback); // add optional JSON payload (implies setting CURLOPT_POST to 1) // or set the CURLOPT_POST option @@ -278,14 +547,18 @@ NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, if (rc == CURLE_OK) { buffer = netdata.buffer; blen = netdata.bused; + if (response_code < 400 && ticket_headers.ticket) { + network_data_update_ticket(data, ticket_headers.ticket, ticket_headers.expires_at); + } } else if (netdata.buffer) { cloudsync_memory_free(netdata.buffer); netdata.buffer = NULL; } cleanup: - if (curl) curl_easy_cleanup(curl); if (headers) curl_slist_free_all(headers); + if (ticket_headers.ticket) cloudsync_memory_free(ticket_headers.ticket); + if (ticket_headers.expires_at) cloudsync_memory_free(ticket_headers.expires_at); // build result NETWORK_RESULT result = {0, NULL, 0, NULL, NULL}; @@ -299,6 +572,14 @@ NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, result.blen = buffer ? blen : rc; } + #ifdef CLOUDSYNC_NETWORK_TRACE + fprintf(stderr, + "[cloudsync-network] endpoint=%s using_ticket=%s\n", + network_trace_endpoint_name(data, endpoint), + using_ticket ? "true" : "false"); + network_trace_log_curl(data, method, endpoint, response_code, result.code, request_bytes, result.blen, curl, pooled, network_trace_now_ms() - trace_start_ms); + #endif + if (curl && !pooled) curl_easy_cleanup(curl); return result; } @@ -321,9 +602,14 @@ bool network_send_buffer (network_data *data, const char *endpoint, const char * bool result = false; char errbuf[CURL_ERROR_SIZE] = {0}; CURLcode rc = CURLE_OK; + long response_code = 0; + bool pooled = false; +#ifdef CLOUDSYNC_NETWORK_TRACE + double trace_start_ms = network_trace_now_ms(); +#endif - // init curl - CURL *curl = curl_easy_init(); + // init/reuse curl + CURL *curl = network_curl_for_endpoint(data, endpoint, &pooled); if (!curl) return false; // set the URL @@ -393,10 +679,18 @@ bool network_send_buffer (network_data *data, const char *endpoint, const char * // perform the upload rc = curl_easy_perform(curl); + if (curl) curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code); if (rc == CURLE_OK) result = true; cleanup: - if (curl) curl_easy_cleanup(curl); + #ifdef CLOUDSYNC_NETWORK_TRACE + network_trace_log_curl(data, "PUT", endpoint, response_code, + result ? CLOUDSYNC_NETWORK_OK : CLOUDSYNC_NETWORK_ERROR, + (size_t)blob_size, + result ? (size_t)blob_size : 0, + curl, pooled, network_trace_now_ms() - trace_start_ms); + #endif + if (curl && !pooled) curl_easy_cleanup(curl); if (headers) curl_slist_free_all(headers); return result; } @@ -730,6 +1024,8 @@ static bool network_compute_endpoints_with_address (sqlite3_context *context, ne snprintf(status_endpoint, requested, "%s/%s/%s/%s/%s", address, CLOUDSYNC_ENDPOINT_PREFIX, managedDatabaseId, data->site_id, CLOUDSYNC_ENDPOINT_STATUS); + network_data_clear_ticket(data); + if (data->check_endpoint) cloudsync_memory_free(data->check_endpoint); data->check_endpoint = check_endpoint; @@ -845,6 +1141,7 @@ bool cloudsync_network_set_authentication_token (sqlite3_context *context, const if (!new_auth_token) return false; if (data->authentication) cloudsync_memory_free(data->authentication); + network_data_clear_ticket(data); data->authentication = new_auth_token; return true; @@ -923,6 +1220,63 @@ static char *json_extract_failure_stage(const char *json, size_t json_len, const return stage; } +static char *network_base64_encode(const unsigned char *src, size_t len) { + static const char table[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + if (!src && len > 0) return NULL; + if (len > (SIZE_MAX - 1) / 4 * 3) return NULL; + + size_t out_len = 4 * ((len + 2) / 3); + char *out = cloudsync_memory_alloc((uint64_t)out_len + 1); + if (!out) return NULL; + + size_t i = 0; + size_t j = 0; + while (i < len) { + uint32_t octet_a = i < len ? src[i++] : 0; + uint32_t octet_b = i < len ? src[i++] : 0; + uint32_t octet_c = i < len ? src[i++] : 0; + uint32_t triple = (octet_a << 16) | (octet_b << 8) | octet_c; + + out[j++] = table[(triple >> 18) & 0x3f]; + out[j++] = table[(triple >> 12) & 0x3f]; + out[j++] = table[(triple >> 6) & 0x3f]; + out[j++] = table[triple & 0x3f]; + } + + if (len % 3 == 1) { + out[out_len - 1] = '='; + out[out_len - 2] = '='; + } else if (len % 3 == 2) { + out[out_len - 1] = '='; + } + + out[out_len] = '\0'; + return out; +} + +static char *network_apply_json_payload(const char *transport_key, const char *transport_value, + int db_version_min, int db_version_max) { + if (!transport_key || !transport_value) return NULL; + + char *escaped_value = json_escape_string(transport_value); + if (!escaped_value) return NULL; + + size_t requested = strlen(transport_key) + strlen(escaped_value) + 128; + char *json_payload = cloudsync_memory_alloc((uint64_t)requested); + if (!json_payload) { + cloudsync_memory_free(escaped_value); + return NULL; + } + + snprintf(json_payload, requested, + "{\"%s\":\"%s\", \"dbVersionMin\":%d, \"dbVersionMax\":%d}", + transport_key, escaped_value, db_version_min, db_version_max); + + cloudsync_memory_free(escaped_value); + return json_payload; +} + static const char *network_compute_status(int64_t last_optimistic, int64_t last_confirmed, int gaps_size, int64_t local_version) { if (last_optimistic < 0 || last_confirmed < 0) return "error"; @@ -1002,43 +1356,78 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, NETWORK_RESULT res; if (blob != NULL && blob_size > 0) { - // there is data to send - res = network_receive_buffer(netdata, netdata->upload_endpoint, netdata->authentication, true, false, NULL, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers)); - if (res.code != CLOUDSYNC_NETWORK_BUFFER) { + int db_version_min = db_version+1; + int db_version_max = (int)new_db_version; + if (db_version_min > db_version_max) db_version_min = db_version_max; + + #ifdef CLOUDSYNC_NETWORK_TRACE + fprintf(stderr, + "[cloudsync-network] send_changes blob_size=%d fast-lane:%s\n", + blob_size, + blob_size <= CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE ? "true" : "false"); + #endif + + if (blob_size <= CLOUDSYNC_NETWORK_FAST_LANE_MAX_BLOB_SIZE) { + char *blob_base64 = network_base64_encode((const unsigned char *)blob, (size_t)blob_size); cloudsync_memory_free(blob); - network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to receive upload URL"); - network_result_cleanup(&res); - return SQLITE_ERROR; - } - - char *s3_url = json_extract_string(res.buffer, res.blen, "url"); - if (!s3_url) { + if (!blob_base64) { + sqlite3_result_error(context, "cloudsync_network_send_changes: unable to encode BLOB changes.", -1); + sqlite3_result_error_code(context, SQLITE_NOMEM); + return SQLITE_NOMEM; + } + + char *json_payload = network_apply_json_payload("blob", blob_base64, db_version_min, db_version_max); + cloudsync_memory_free(blob_base64); + if (!json_payload) { + sqlite3_result_error(context, "cloudsync_network_send_changes: unable to allocate apply request payload.", -1); + sqlite3_result_error_code(context, SQLITE_NOMEM); + return SQLITE_NOMEM; + } + + res = network_receive_buffer(netdata, netdata->apply_endpoint, netdata->authentication, true, true, json_payload, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers)); + cloudsync_memory_free(json_payload); + } else { + // bulk lane: stage the payload through the upload endpoint and apply by URL + res = network_receive_buffer(netdata, netdata->upload_endpoint, netdata->authentication, true, false, NULL, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers)); + if (res.code != CLOUDSYNC_NETWORK_BUFFER) { + cloudsync_memory_free(blob); + network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to receive upload URL"); + network_result_cleanup(&res); + return SQLITE_ERROR; + } + + char *s3_url = json_extract_string(res.buffer, res.blen, "url"); + if (!s3_url) { + cloudsync_memory_free(blob); + sqlite3_result_error(context, "cloudsync_network_send_changes: missing 'url' in upload response.", -1); + network_result_cleanup(&res); + return SQLITE_ERROR; + } + bool sent = network_send_buffer(netdata, s3_url, NULL, blob, blob_size); cloudsync_memory_free(blob); - sqlite3_result_error(context, "cloudsync_network_send_changes: missing 'url' in upload response.", -1); - network_result_cleanup(&res); - return SQLITE_ERROR; - } - bool sent = network_send_buffer(netdata, s3_url, NULL, blob, blob_size); - cloudsync_memory_free(blob); - if (sent == false) { + if (sent == false) { + cloudsync_memory_free(s3_url); + network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to upload BLOB changes to remote host."); + network_result_cleanup(&res); + return SQLITE_ERROR; + } + + char *json_payload = network_apply_json_payload("url", s3_url, db_version_min, db_version_max); cloudsync_memory_free(s3_url); - network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to upload BLOB changes to remote host."); + if (!json_payload) { + sqlite3_result_error(context, "cloudsync_network_send_changes: unable to allocate apply request payload.", -1); + sqlite3_result_error_code(context, SQLITE_NOMEM); + network_result_cleanup(&res); + return SQLITE_NOMEM; + } + + // free res network_result_cleanup(&res); - return SQLITE_ERROR; + + // notify remote host that we successfully uploaded changes + res = network_receive_buffer(netdata, netdata->apply_endpoint, netdata->authentication, true, true, json_payload, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers)); + cloudsync_memory_free(json_payload); } - - int db_version_min = db_version+1; - int db_version_max = (int)new_db_version; - if (db_version_min > db_version_max) db_version_min = db_version_max; - char json_payload[4096]; - snprintf(json_payload, sizeof(json_payload), "{\"url\":\"%s\", \"dbVersionMin\":%d, \"dbVersionMax\":%d}", s3_url, db_version_min, db_version_max); - cloudsync_memory_free(s3_url); - - // free res - network_result_cleanup(&res); - - // notify remote host that we succesfully uploaded changes - res = network_receive_buffer(netdata, netdata->apply_endpoint, netdata->authentication, true, true, json_payload, cloudsync_default_headers, ARRAY_LEN(cloudsync_default_headers)); } else { // there is no data to send, just check the status to update the db_version value in settings and to reply the status new_db_version = db_version; @@ -1059,7 +1448,7 @@ int cloudsync_network_send_changes_internal (sqlite3_context *context, int argc, apply_failure_json = json_extract_failure_stage(res.buffer, res.blen, "apply"); check_failure_json = json_extract_failure_stage(res.buffer, res.blen, "check"); } else if (res.code != CLOUDSYNC_NETWORK_OK) { - network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to notify BLOB upload to remote host."); + network_result_to_sqlite_error(context, res, "cloudsync_network_send_changes unable to apply changes to remote host."); network_result_cleanup(&res); return SQLITE_ERROR; } diff --git a/src/network/network.m b/src/network/network.m index 0f13478..89c00f9 100644 --- a/src/network/network.m +++ b/src/network/network.m @@ -14,9 +14,17 @@ void network_buffer_cleanup (void *xdata) { } bool network_send_buffer(network_data *data, const char *endpoint, const char *authentication, const void *blob, int blob_size) { +#ifdef CLOUDSYNC_NETWORK_TRACE + double trace_start_ms = network_trace_now_ms(); +#endif NSString *urlString = [NSString stringWithUTF8String:endpoint]; NSURL *url = [NSURL URLWithString:urlString]; - if (!url) return false; + if (!url) { + #ifdef CLOUDSYNC_NETWORK_TRACE + network_trace_log(data, "PUT", endpoint, 0, CLOUDSYNC_NETWORK_ERROR, (size_t)blob_size, 0, network_trace_now_ms() - trace_start_ms); + #endif + return false; + } NSMutableURLRequest *request = [NSMutableURLRequest requestWithURL:url]; [request setHTTPMethod:@"PUT"]; @@ -37,6 +45,7 @@ bool network_send_buffer(network_data *data, const char *endpoint, const char *a [request setHTTPBody:bodyData]; __block bool success = false; + __block NSInteger statusCode = 0; dispatch_semaphore_t sema = dispatch_semaphore_create(0); NSURLSessionConfiguration *config = [NSURLSessionConfiguration ephemeralSessionConfiguration]; @@ -47,7 +56,7 @@ bool network_send_buffer(network_data *data, const char *endpoint, const char *a NSURLResponse * _Nullable response, NSError * _Nullable error) { if (!error && [response isKindOfClass:[NSHTTPURLResponse class]]) { - NSInteger statusCode = [(NSHTTPURLResponse *)response statusCode]; + statusCode = [(NSHTTPURLResponse *)response statusCode]; success = (statusCode >= 200 && statusCode < 300); } dispatch_semaphore_signal(sema); @@ -57,11 +66,28 @@ bool network_send_buffer(network_data *data, const char *endpoint, const char *a dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER); [session finishTasksAndInvalidate]; + #ifdef CLOUDSYNC_NETWORK_TRACE + network_trace_log(data, "PUT", endpoint, (long)statusCode, + success ? CLOUDSYNC_NETWORK_OK : CLOUDSYNC_NETWORK_ERROR, + (size_t)blob_size, + success ? (size_t)blob_size : 0, + network_trace_now_ms() - trace_start_ms); + #endif + return success; } NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, const char *authentication, bool zero_terminated, bool is_post_request, char *json_payload, const char **extra_headers, int nextra_headers) { +#ifdef CLOUDSYNC_NETWORK_TRACE + double trace_start_ms = network_trace_now_ms(); + size_t request_bytes = json_payload ? strlen(json_payload) : 0; +#endif + const char *method = (json_payload || is_post_request) ? "POST" : "GET"; + bool using_ticket = network_data_should_use_ticket(data, endpoint, authentication); +#ifndef CLOUDSYNC_NETWORK_TRACE + (void)method; +#endif NSString *urlString = [NSString stringWithUTF8String:endpoint]; NSURL *url = [NSURL URLWithString:urlString]; @@ -72,6 +98,9 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, result.buffer = (char *)msg.UTF8String; result.xdata = (void *)CFBridgingRetain(msg); result.xfree = network_buffer_cleanup; + #ifdef CLOUDSYNC_NETWORK_TRACE + network_trace_log(data, method, endpoint, 0, result.code, request_bytes, 0, network_trace_now_ms() - trace_start_ms); + #endif return result; } @@ -97,6 +126,10 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, NSString *authString = [NSString stringWithFormat:@"Bearer %s", authentication]; [request setValue:authString forHTTPHeaderField:@"Authorization"]; } + if (using_ticket) { + char *ticket = network_data_get_ticket(data); + [request setValue:[NSString stringWithUTF8String:ticket] forHTTPHeaderField:@CLOUDSYNC_HEADER_TICKET]; + } if (json_payload) { [request setValue:@"application/json" forHTTPHeaderField:@"Content-Type"]; @@ -110,6 +143,8 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, __block NSString *responseError = nil; __block NSInteger statusCode = 0; __block NSInteger errorCode = 0; + __block NSString *responseTicket = nil; + __block NSString *responseTicketExpiresAt = nil; dispatch_semaphore_t sema = dispatch_semaphore_create(0); @@ -122,7 +157,18 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, errorCode = [error code]; } if ([response isKindOfClass:[NSHTTPURLResponse class]]) { - statusCode = [(NSHTTPURLResponse *)response statusCode]; + NSHTTPURLResponse *httpResponse = (NSHTTPURLResponse *)response; + statusCode = [httpResponse statusCode]; + NSDictionary *headers = [httpResponse allHeaderFields]; + for (id key in headers) { + NSString *name = [key description]; + NSString *value = [[headers objectForKey:key] description]; + if ([name caseInsensitiveCompare:@CLOUDSYNC_HEADER_TICKET] == NSOrderedSame) { + responseTicket = value; + } else if ([name caseInsensitiveCompare:@CLOUDSYNC_HEADER_TICKET_EXPIRES_AT] == NSOrderedSame) { + responseTicketExpiresAt = value; + } + } } dispatch_semaphore_signal(sema); }]; @@ -131,10 +177,23 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER); [session finishTasksAndInvalidate]; + if (!responseError && (statusCode >= 200 && statusCode < 300) && responseTicket && [responseTicket length] > 0) { + network_data_update_ticket(data, [responseTicket UTF8String], + responseTicketExpiresAt ? [responseTicketExpiresAt UTF8String] : NULL); + } + if (!responseError && (statusCode >= 200 && statusCode < 300)) { // check if OK should be returned if (responseData == nil || [responseData length] == 0) { - return (NETWORK_RESULT){CLOUDSYNC_NETWORK_OK, NULL, 0, NULL, NULL}; + NETWORK_RESULT result = {CLOUDSYNC_NETWORK_OK, NULL, 0, NULL, NULL}; + #ifdef CLOUDSYNC_NETWORK_TRACE + fprintf(stderr, + "[cloudsync-network] endpoint=%s using_ticket=%s\n", + network_trace_endpoint_name(data, endpoint), + using_ticket ? "true" : "false"); + network_trace_log(data, method, endpoint, (long)statusCode, result.code, request_bytes, 0, network_trace_now_ms() - trace_start_ms); + #endif + return result; } // otherwise return a buffer @@ -144,7 +203,11 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, NSString *utf8String = [[NSString alloc] initWithData:responseData encoding:NSUTF8StringEncoding]; if (!utf8String) { NSString *msg = @"Response is not valid UTF-8"; - return (NETWORK_RESULT){CLOUDSYNC_NETWORK_ERROR, (char *)msg.UTF8String, 0, (void *)CFBridgingRetain(msg), network_buffer_cleanup}; + NETWORK_RESULT error_result = {CLOUDSYNC_NETWORK_ERROR, (char *)msg.UTF8String, 0, (void *)CFBridgingRetain(msg), network_buffer_cleanup}; + #ifdef CLOUDSYNC_NETWORK_TRACE + network_trace_log(data, method, endpoint, (long)statusCode, error_result.code, request_bytes, 0, network_trace_now_ms() - trace_start_ms); + #endif + return error_result; } result.buffer = (char *)utf8String.UTF8String; result.xdata = (void *)CFBridgingRetain(utf8String); @@ -155,6 +218,13 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, result.blen = [responseData length]; result.xfree = network_buffer_cleanup; + #ifdef CLOUDSYNC_NETWORK_TRACE + fprintf(stderr, + "[cloudsync-network] endpoint=%s using_ticket=%s\n", + network_trace_endpoint_name(data, endpoint), + using_ticket ? "true" : "false"); + network_trace_log(data, method, endpoint, (long)statusCode, result.code, request_bytes, result.blen, network_trace_now_ms() - trace_start_ms); + #endif return result; } @@ -178,5 +248,12 @@ NETWORK_RESULT network_receive_buffer(network_data *data, const char *endpoint, result.xfree = network_buffer_cleanup; result.blen = responseError ? (size_t)errorCode : (size_t)statusCode; + #ifdef CLOUDSYNC_NETWORK_TRACE + fprintf(stderr, + "[cloudsync-network] endpoint=%s using_ticket=%s\n", + network_trace_endpoint_name(data, endpoint), + using_ticket ? "true" : "false"); + network_trace_log(data, method, endpoint, (long)statusCode, result.code, request_bytes, 0, network_trace_now_ms() - trace_start_ms); + #endif return result; } diff --git a/src/network/network_private.h b/src/network/network_private.h index c282e24..dae4774 100644 --- a/src/network/network_private.h +++ b/src/network/network_private.h @@ -16,6 +16,8 @@ #define CLOUDSYNC_ENDPOINT_STATUS "status" #define CLOUDSYNC_HEADER_ORG "X-CloudSync-Org" #define CLOUDSYNC_HEADER_VERSION "X-CloudSync-Version" +#define CLOUDSYNC_HEADER_TICKET "X-CloudSync-Ticket" +#define CLOUDSYNC_HEADER_TICKET_EXPIRES_AT "X-CloudSync-Ticket-Expires-At" // CLOUDSYNC_VERSION is defined in cloudsync.h — include it before this header at use sites. #define CLOUDSYNC_HEADER_VERSION_LINE CLOUDSYNC_HEADER_VERSION ": " CLOUDSYNC_VERSION #define CLOUDSYNC_HEADER_CHECK_CAPABILITIES "X-CloudSync-Capabilities: check-status-response" @@ -36,10 +38,20 @@ typedef struct { char *network_data_get_siteid (network_data *data); char *network_data_get_orgid (network_data *data); +char *network_data_get_ticket (network_data *data); +bool network_data_should_use_ticket (network_data *data, const char *endpoint, const char *authentication); +void network_data_update_ticket (network_data *data, const char *ticket, const char *expires_at); bool network_data_set_endpoints (network_data *data, char *auth, char *check, char *upload, char *apply, char *status); bool network_send_buffer(network_data *data, const char *endpoint, const char *authentication, const void *blob, int blob_size); NETWORK_RESULT network_receive_buffer (network_data *data, const char *endpoint, const char *authentication, bool zero_terminated, bool is_post_request, char *json_payload, const char **extra_headers, int nextra_headers); +#ifdef CLOUDSYNC_NETWORK_TRACE +const char *network_trace_endpoint_name(network_data *data, const char *endpoint); +const char *network_trace_result_name(int code); +void network_trace_log(network_data *data, const char *method, const char *endpoint, long http_status, int result_code, size_t request_bytes, size_t bytes, double elapsed_ms); +double network_trace_now_ms(void); +#endif + #endif diff --git a/test/integration.c b/test/integration.c index 0e5a2e0..6966fde 100644 --- a/test/integration.c +++ b/test/integration.c @@ -517,7 +517,7 @@ int test_failure_path (const char *db_path) { rc = open_load_ext(db_path, &db); RCHECK - rc = db_exec(db, "CREATE TABLE IF NOT EXISTS failure_users (id TEXT PRIMARY KEY NOT NULL, name TEXT NOT NULL DEFAULT '');"); RCHECK + rc = db_exec(db, "CREATE TABLE IF NOT EXISTS failure_users (id TEXT PRIMARY KEY NOT NULL, name TEXT NOT NULL DEFAULT '', value BLOB);"); RCHECK rc = db_exec(db, "SELECT cloudsync_init('failure_users');"); RCHECK char network_init[1024]; @@ -534,10 +534,12 @@ int test_failure_path (const char *db_path) { } // Insert a row so cloudsync_network_send_changes has a payload to upload. + // Insert a 1MB value to skip the fast-lane and force using the normal s3 path with async job, + // otherwise the error would be immediately returned by the apply endpoint. char value[UUID_STR_MAXLEN]; cloudsync_uuid_v7_string(value, true); char sql[256]; - snprintf(sql, sizeof(sql), "INSERT INTO failure_users (id, name) VALUES ('%s', '%s');", value, value); + snprintf(sql, sizeof(sql), "INSERT INTO failure_users (id, name, value) VALUES ('%s', '%s', randomblob(1048576));", value, value); rc = db_exec(db, sql); RCHECK // First invocation — primes the server. Failures may not yet be reported. diff --git a/test/sync_bench.c b/test/sync_bench.c new file mode 100644 index 0000000..946fffa --- /dev/null +++ b/test/sync_bench.c @@ -0,0 +1,653 @@ +// +// sync_bench.c +// cloudsync +// +// Measures end-to-end sync latency from one local SQLite database to another. +// + +#include +#include +#include +#include +#ifdef CLOUDSYNC_NETWORK_TRACE +#include +#endif +#include "sqlite3.h" +#include "utils.h" + +#ifdef _WIN32 +#include +#else +#include +#endif + +#define DB_A_PATH "dist/sync-bench-a.sqlite" +#define DB_B_PATH "dist/sync-bench-b.sqlite" +#define EXT_PATH "./dist/cloudsync" +#define DEFAULT_POLL_DELAY_MS 250 +#define DEFAULT_MAX_POLLS 40 +#define DEFAULT_RANDOM_BLOB_SIZE_BYTES (100 * 1024) +#define DEFAULT_CLEANUP_OLDER_THAN_SECONDS (24 * 60 * 60) + +typedef struct { + const char *operation; + int attempt; + int sqlite_rc; + int rows_received; + double started_ms; + double ended_ms; + double elapsed_ms; + char *result_json; +} sync_bench_request; + +typedef struct { + int local_version; + int server_version; + char *status; +} sync_bench_send_summary; + +#ifdef CLOUDSYNC_NETWORK_TRACE +static void bench_trace(const char *fmt, ...) { + va_list args; + va_start(args, fmt); + fprintf(stderr, "[sync-bench] "); + vfprintf(stderr, fmt, args); + fprintf(stderr, "\n"); + va_end(args); +} +#else +#define bench_trace(...) ((void)0) +#endif + +static double monotonic_ms(void) { +#ifdef _WIN32 + LARGE_INTEGER freq; + LARGE_INTEGER counter; + QueryPerformanceFrequency(&freq); + QueryPerformanceCounter(&counter); + return ((double)counter.QuadPart * 1000.0) / (double)freq.QuadPart; +#else + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ((double)ts.tv_sec * 1000.0) + ((double)ts.tv_nsec / 1000000.0); +#endif +} + +static char *str_dup(const char *value) { + if (!value) value = ""; + size_t len = strlen(value); + char *copy = (char *)malloc(len + 1); + if (!copy) return NULL; + memcpy(copy, value, len + 1); + return copy; +} + +static int env_int(const char *name, int default_value) { + const char *value = getenv(name); + if (!value || !*value) return default_value; + char *end = NULL; + long parsed = strtol(value, &end, 10); + if (!end || *end != '\0' || parsed < 0 || parsed > 1000000) return default_value; + return (int)parsed; +} + +static int db_exec(sqlite3 *db, const char *sql) { + char *errmsg = NULL; + int rc = sqlite3_exec(db, sql, NULL, NULL, &errmsg); + if (rc != SQLITE_OK) { + fprintf(stderr, "Error while executing %s: %s\n", sql, errmsg ? errmsg : sqlite3_errmsg(db)); + sqlite3_free(errmsg); + } + return rc; +} + +static int query_text(sqlite3 *db, const char *sql, char **out) { + sqlite3_stmt *stmt = NULL; + int rc = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + fprintf(stderr, "Error while preparing %s: %s\n", sql, sqlite3_errmsg(db)); + return rc; + } + + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + const unsigned char *value = sqlite3_column_text(stmt, 0); + *out = str_dup((const char *)value); + if (!*out) rc = SQLITE_NOMEM; + else rc = SQLITE_OK; + } else if (rc == SQLITE_DONE) { + *out = NULL; + rc = SQLITE_OK; + } else { + fprintf(stderr, "Error while stepping %s: %s\n", sql, sqlite3_errmsg(db)); + } + + int finalize_rc = sqlite3_finalize(stmt); + if (rc == SQLITE_OK && finalize_rc != SQLITE_OK) rc = finalize_rc; + return rc; +} + +static int timed_query_text(sqlite3 *db, const char *sql, char **out, double *started_ms, double *ended_ms) { + *started_ms = monotonic_ms(); + int rc = query_text(db, sql, out); + *ended_ms = monotonic_ms(); + return rc; +} + +static int open_load_ext(const char *db_path, sqlite3 **out_db) { + bench_trace("step=open-load-extension db_path=%s begin", db_path); + sqlite3 *db = NULL; + int rc = sqlite3_open(db_path, &db); + if (rc != SQLITE_OK) { + fprintf(stderr, "Unable to open %s: %s\n", db_path, db ? sqlite3_errmsg(db) : "unknown error"); + if (db) sqlite3_close(db); + bench_trace("step=open-load-extension db_path=%s end rc=%d", db_path, rc); + return rc; + } + + rc = sqlite3_enable_load_extension(db, 1); + if (rc != SQLITE_OK) { + fprintf(stderr, "Unable to enable load_extension for %s: %s\n", db_path, sqlite3_errmsg(db)); + sqlite3_close(db); + bench_trace("step=open-load-extension db_path=%s end rc=%d", db_path, rc); + return rc; + } + + rc = db_exec(db, "SELECT load_extension('" EXT_PATH "');"); + if (rc != SQLITE_OK) { + sqlite3_close(db); + bench_trace("step=open-load-extension db_path=%s end rc=%d", db_path, rc); + return rc; + } + + *out_db = db; + bench_trace("step=open-load-extension db_path=%s end rc=%d", db_path, SQLITE_OK); + return SQLITE_OK; +} + +static int init_schema(sqlite3 *db, const char *label) { + bench_trace("step=init-schema db=%s begin", label); + int rc = db_exec(db, + "CREATE TABLE IF NOT EXISTS sync_bench_items (" + "id TEXT PRIMARY KEY NOT NULL," + "payload TEXT NOT NULL DEFAULT ''," + "marker TEXT NOT NULL DEFAULT ''," + "random_blob BLOB NOT NULL DEFAULT X''," + "updated_at TEXT NOT NULL DEFAULT ''" + ");"); + if (rc != SQLITE_OK) { + bench_trace("step=init-schema db=%s end rc=%d", label, rc); + return rc; + } + + rc = db_exec(db, "SELECT cloudsync_init('sync_bench_items');"); + bench_trace("step=init-schema db=%s end rc=%d", label, rc); + return rc; +} + +static int init_network(sqlite3 *db, const char *label, const char *database_id, const char *address, const char *apikey) { + char sql[2048]; + if (address && *address) { + snprintf(sql, sizeof(sql), "SELECT cloudsync_network_init_custom('%s', '%s');", address, database_id); + } else { + snprintf(sql, sizeof(sql), "SELECT cloudsync_network_init('%s');", database_id); + } + bench_trace("step=network-init db=%s begin mode=%s", label, (address && *address) ? "custom-address" : "default-address"); + int rc = db_exec(db, sql); + bench_trace("step=network-init db=%s end rc=%d", label, rc); + if (rc != SQLITE_OK) return rc; + + if (apikey && *apikey) { + bench_trace("step=set-apikey db=%s begin", label); + snprintf(sql, sizeof(sql), "SELECT cloudsync_network_set_apikey('%s');", apikey); + rc = db_exec(db, sql); + bench_trace("step=set-apikey db=%s end rc=%d", label, rc); + if (rc != SQLITE_OK) return rc; + } + + bench_trace("step=pre-measure-sync db=%s begin sql=cloudsync_network_sync(500,4)", label); + rc = db_exec(db, "SELECT cloudsync_network_sync(500, 4);"); + bench_trace("step=pre-measure-sync db=%s end rc=%d", label, rc); + return rc; +} + +static int setup_database(const char *label, const char *path, const char *database_id, const char *address, const char *apikey, sqlite3 **out_db) { + bench_trace("step=setup-database db=%s path=%s begin", label, path); + int rc = open_load_ext(path, out_db); + if (rc != SQLITE_OK) { + bench_trace("step=setup-database db=%s end rc=%d", label, rc); + return rc; + } + + rc = init_schema(*out_db, label); + if (rc != SQLITE_OK) { + bench_trace("step=setup-database db=%s end rc=%d", label, rc); + return rc; + } + + rc = init_network(*out_db, label, database_id, address, apikey); + bench_trace("step=setup-database db=%s end rc=%d", label, rc); + return rc; +} + +static int verify_row(sqlite3 *db, const char *id, const char *payload, const char *marker, + const void *random_blob, int random_blob_size, bool *verified) { + sqlite3_stmt *stmt = NULL; + const char *sql = "SELECT payload, marker, random_blob FROM sync_bench_items WHERE id = ?;"; + int rc = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + fprintf(stderr, "Error while preparing verification query: %s\n", sqlite3_errmsg(db)); + return rc; + } + + sqlite3_bind_text(stmt, 1, id, -1, SQLITE_TRANSIENT); + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + const char *actual_payload = (const char *)sqlite3_column_text(stmt, 0); + const char *actual_marker = (const char *)sqlite3_column_text(stmt, 1); + const void *actual_blob = sqlite3_column_blob(stmt, 2); + int actual_blob_size = sqlite3_column_bytes(stmt, 2); + bool blob_matches = (actual_blob_size == random_blob_size) && + (random_blob_size == 0 || + (actual_blob && random_blob && memcmp(actual_blob, random_blob, (size_t)random_blob_size) == 0)); + *verified = actual_payload && actual_marker && + strcmp(actual_payload, payload) == 0 && + strcmp(actual_marker, marker) == 0 && + blob_matches; + rc = SQLITE_OK; + } else if (rc == SQLITE_DONE) { + *verified = false; + rc = SQLITE_OK; + } else { + fprintf(stderr, "Error while verifying row: %s\n", sqlite3_errmsg(db)); + } + + int finalize_rc = sqlite3_finalize(stmt); + if (rc == SQLITE_OK && finalize_rc != SQLITE_OK) rc = finalize_rc; + return rc; +} + +static int insert_benchmark_row(sqlite3 *db, const char *id, const char *payload, const char *marker, + const void *random_blob, int random_blob_size) { + bench_trace("step=insert-source-row db=db_a row_id=%s random_blob_size_bytes=%d begin", id, random_blob_size); + sqlite3_stmt *stmt = NULL; + const char *sql = "INSERT INTO sync_bench_items (id, payload, marker, random_blob, updated_at) VALUES (?, ?, ?, ?, datetime('now'));"; + int rc = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + fprintf(stderr, "Error while preparing insert: %s\n", sqlite3_errmsg(db)); + bench_trace("step=insert-source-row db=db_a row_id=%s end rc=%d", id, rc); + return rc; + } + + rc = sqlite3_bind_text(stmt, 1, id, -1, SQLITE_TRANSIENT); + if (rc == SQLITE_OK) rc = sqlite3_bind_text(stmt, 2, payload, -1, SQLITE_TRANSIENT); + if (rc == SQLITE_OK) rc = sqlite3_bind_text(stmt, 3, marker, -1, SQLITE_TRANSIENT); + if (rc == SQLITE_OK) rc = sqlite3_bind_blob(stmt, 4, random_blob, random_blob_size, SQLITE_TRANSIENT); + if (rc != SQLITE_OK) { + fprintf(stderr, "Error while binding benchmark row: %s\n", sqlite3_errmsg(db)); + sqlite3_finalize(stmt); + bench_trace("step=insert-source-row db=db_a row_id=%s random_blob_size_bytes=%d end rc=%d", id, random_blob_size, rc); + return rc; + } + + rc = sqlite3_step(stmt); + if (rc == SQLITE_DONE) rc = SQLITE_OK; + else fprintf(stderr, "Error while inserting benchmark row: %s\n", sqlite3_errmsg(db)); + + int finalize_rc = sqlite3_finalize(stmt); + if (rc == SQLITE_OK && finalize_rc != SQLITE_OK) rc = finalize_rc; + bench_trace("step=insert-source-row db=db_a row_id=%s random_blob_size_bytes=%d end rc=%d", id, random_blob_size, rc); + return rc; +} + +static int cleanup_old_benchmark_rows(sqlite3 *db, int older_than_seconds, int *deleted_count) { + if (deleted_count) *deleted_count = 0; + if (older_than_seconds <= 0) { + bench_trace("step=cleanup-old-source-rows db=db_a enabled=false"); + return SQLITE_OK; + } + + char modifier[64]; + snprintf(modifier, sizeof(modifier), "-%d seconds", older_than_seconds); + + bench_trace("step=cleanup-old-source-rows db=db_a older_than_seconds=%d begin", older_than_seconds); + sqlite3_stmt *stmt = NULL; + const char *sql = + "DELETE FROM sync_bench_items " + "WHERE marker LIKE 'sync-bench-%' " + "AND updated_at < datetime('now', ?);"; + int rc = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL); + if (rc != SQLITE_OK) { + fprintf(stderr, "Error while preparing cleanup delete: %s\n", sqlite3_errmsg(db)); + bench_trace("step=cleanup-old-source-rows db=db_a end rc=%d deleted=0", rc); + return rc; + } + + rc = sqlite3_bind_text(stmt, 1, modifier, -1, SQLITE_TRANSIENT); + if (rc != SQLITE_OK) { + fprintf(stderr, "Error while binding cleanup delete: %s\n", sqlite3_errmsg(db)); + sqlite3_finalize(stmt); + bench_trace("step=cleanup-old-source-rows db=db_a end rc=%d deleted=0", rc); + return rc; + } + + rc = sqlite3_step(stmt); + if (rc == SQLITE_DONE) { + rc = SQLITE_OK; + if (deleted_count) *deleted_count = sqlite3_changes(db); + } else { + fprintf(stderr, "Error while deleting old benchmark rows: %s\n", sqlite3_errmsg(db)); + } + + int finalize_rc = sqlite3_finalize(stmt); + if (rc == SQLITE_OK && finalize_rc != SQLITE_OK) rc = finalize_rc; + bench_trace("step=cleanup-old-source-rows db=db_a end rc=%d deleted=%d", rc, deleted_count ? *deleted_count : 0); + return rc; +} + +static int json_int_at_path(sqlite3 *db, const char *json, const char *path, int default_value) { + sqlite3_stmt *stmt = NULL; + int value = default_value; + int rc = sqlite3_prepare_v2(db, "SELECT json_extract(?, ?);", -1, &stmt, NULL); + if (rc != SQLITE_OK) return default_value; + sqlite3_bind_text(stmt, 1, json ? json : "", -1, SQLITE_TRANSIENT); + sqlite3_bind_text(stmt, 2, path, -1, SQLITE_TRANSIENT); + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW && sqlite3_column_type(stmt, 0) != SQLITE_NULL) value = sqlite3_column_int(stmt, 0); + sqlite3_finalize(stmt); + return value; +} + +static char *json_text_at_path(sqlite3 *db, const char *json, const char *path) { + sqlite3_stmt *stmt = NULL; + char *value = NULL; + int rc = sqlite3_prepare_v2(db, "SELECT json_extract(?, ?);", -1, &stmt, NULL); + if (rc != SQLITE_OK) return NULL; + sqlite3_bind_text(stmt, 1, json ? json : "", -1, SQLITE_TRANSIENT); + sqlite3_bind_text(stmt, 2, path, -1, SQLITE_TRANSIENT); + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW && sqlite3_column_type(stmt, 0) != SQLITE_NULL) { + value = str_dup((const char *)sqlite3_column_text(stmt, 0)); + } + sqlite3_finalize(stmt); + return value; +} + +static int timed_request(sqlite3 *db, sync_bench_request *request, const char *operation, int attempt, const char *sql) { + request->operation = operation; + request->attempt = attempt; + request->rows_received = -1; + request->result_json = NULL; + request->sqlite_rc = timed_query_text(db, sql, &request->result_json, &request->started_ms, &request->ended_ms); + request->elapsed_ms = request->ended_ms - request->started_ms; + if (strcmp(operation, "check") == 0 && request->result_json) { + request->rows_received = json_int_at_path(db, request->result_json, "$.receive.rows", -1); + } + return request->sqlite_rc; +} + +static void json_print_escaped(const char *value) { + putchar('"'); + if (value) { + for (const unsigned char *p = (const unsigned char *)value; *p; p++) { + switch (*p) { + case '\\': printf("\\\\"); break; + case '"': printf("\\\""); break; + case '\n': printf("\\n"); break; + case '\r': printf("\\r"); break; + case '\t': printf("\\t"); break; + default: + if (*p < 0x20) printf("\\u%04x", *p); + else putchar(*p); + } + } + } + putchar('"'); +} + +static void print_text_report(const char *database_id, int poll_delay_ms, int max_polls, int random_blob_size, + int cleanup_older_than_seconds, int cleanup_deleted_rows, + const char *row_id, bool applied, + int polls, double total_ms, double verify_ms, double request_ms, double poll_sleep_ms, + double measured_overhead_ms, sync_bench_send_summary send_summary, + sync_bench_request *requests, int request_count) { + printf("\nSync Performance Benchmark\n"); + printf("database_id: %s\n", database_id); + printf("poll_delay_ms: %d\n", poll_delay_ms); + printf("max_polls: %d\n", max_polls); + printf("random_blob_size_bytes: %d\n", random_blob_size); + printf("cleanup_older_than_seconds: %d\n", cleanup_older_than_seconds); + printf("cleanup_deleted_rows: %d\n", cleanup_deleted_rows); + printf("row_id: %s\n", row_id); + printf("\nRequests:\n"); + for (int i = 0; i < request_count; i++) { + if (strcmp(requests[i].operation, "check") == 0) { + printf("%s[%d] %.2f ms rc=%d rows=%d\n", requests[i].operation, requests[i].attempt, + requests[i].elapsed_ms, requests[i].sqlite_rc, requests[i].rows_received); + } else { + printf("%s[%d] %.2f ms rc=%d status=%s localVersion=%d serverVersion=%d\n", + requests[i].operation, requests[i].attempt, requests[i].elapsed_ms, requests[i].sqlite_rc, + send_summary.status ? send_summary.status : "unknown", + send_summary.local_version, send_summary.server_version); + } + } + printf("\nResult:\n"); + printf("applied: %s\n", applied ? "true" : "false"); + printf("polls: %d\n", polls); + printf("total_send_to_apply_check_end_ms: %.2f\n", total_ms); + printf("network_request_elapsed_ms: %.2f\n", request_ms); + printf("poll_sleep_elapsed_ms: %.2f\n", poll_sleep_ms); + printf("local_overhead_elapsed_ms: %.2f\n", measured_overhead_ms); + printf("verification_select_ms: %.2f\n", verify_ms); +} + +static void print_json_report(int poll_delay_ms, int max_polls, int random_blob_size, + int cleanup_older_than_seconds, int cleanup_deleted_rows, + const char *row_id, bool applied, + int polls, double total_ms, double verify_ms, double request_ms, double poll_sleep_ms, + double measured_overhead_ms, sync_bench_send_summary send_summary, + sync_bench_request *requests, int request_count) { + printf("{\n"); + printf(" \"applied\": %s,\n", applied ? "true" : "false"); + printf(" \"pollDelayMs\": %d,\n", poll_delay_ms); + printf(" \"maxPolls\": %d,\n", max_polls); + printf(" \"randomBlobSizeBytes\": %d,\n", random_blob_size); + printf(" \"cleanupOlderThanSeconds\": %d,\n", cleanup_older_than_seconds); + printf(" \"cleanupDeletedRows\": %d,\n", cleanup_deleted_rows); + printf(" \"polls\": %d,\n", polls); + printf(" \"rowId\": "); json_print_escaped(row_id); printf(",\n"); + printf(" \"totalSendToApplyCheckEndMs\": %.2f,\n", total_ms); + printf(" \"networkRequestElapsedMs\": %.2f,\n", request_ms); + printf(" \"pollSleepElapsedMs\": %.2f,\n", poll_sleep_ms); + printf(" \"localOverheadElapsedMs\": %.2f,\n", measured_overhead_ms); + printf(" \"verificationSelectMs\": %.2f,\n", verify_ms); + printf(" \"send\": {\"status\": "); json_print_escaped(send_summary.status); + printf(", \"localVersion\": %d, \"serverVersion\": %d},\n", send_summary.local_version, send_summary.server_version); + printf(" \"requests\": [\n"); + for (int i = 0; i < request_count; i++) { + printf(" {\"operation\": "); json_print_escaped(requests[i].operation); + printf(", \"attempt\": %d, \"sqliteRc\": %d, \"elapsedMs\": %.2f", requests[i].attempt, + requests[i].sqlite_rc, requests[i].elapsed_ms); + if (strcmp(requests[i].operation, "check") == 0) printf(", \"rows\": %d", requests[i].rows_received); + printf(", \"result\": "); json_print_escaped(requests[i].result_json); + printf("}%s\n", i + 1 == request_count ? "" : ","); + } + printf(" ]\n"); + printf("}\n"); +} + +static void free_requests(sync_bench_request *requests, int request_count) { + for (int i = 0; i < request_count; i++) free(requests[i].result_json); +} + +int main(void) { + int rc = SQLITE_OK; + sqlite3 *db_a = NULL; + sqlite3 *db_b = NULL; + sync_bench_request *requests = NULL; + int request_count = 0; + bool applied = false; + int polls = 0; + double total_ms = 0.0; + double verify_ms = 0.0; + double poll_sleep_ms = 0.0; + double request_ms = 0.0; + double measured_overhead_ms = 0.0; + sync_bench_send_summary send_summary = {-1, -1, NULL}; + int cleanup_deleted_rows = 0; + char row_id[UUID_STR_MAXLEN] = ""; + char marker[96] = ""; + char payload[128] = ""; + unsigned char empty_blob = 0; + void *random_blob = NULL; + + const char *database_id = getenv("SYNC_BENCH_DATABASE_ID"); + const char *address = getenv("SYNC_BENCH_CLOUDSYNC_ADDRESS"); + const char *apikey = getenv("SYNC_BENCH_APIKEY"); + const char *output = getenv("SYNC_BENCH_OUTPUT"); + int poll_delay_ms = env_int("SYNC_BENCH_POLL_DELAY_MS", DEFAULT_POLL_DELAY_MS); + int max_polls = env_int("SYNC_BENCH_MAX_POLLS", DEFAULT_MAX_POLLS); + int random_blob_size = env_int("SYNC_BENCH_RANDOM_BLOB_SIZE_BYTES", DEFAULT_RANDOM_BLOB_SIZE_BYTES); + int cleanup_older_than_seconds = env_int("SYNC_BENCH_CLEANUP_OLDER_THAN_SECONDS", DEFAULT_CLEANUP_OLDER_THAN_SECONDS); + + if (!database_id || !*database_id) { + fprintf(stderr, "Error: SYNC_BENCH_DATABASE_ID not set.\n"); + return SQLITE_MISUSE; + } + + requests = (sync_bench_request *)calloc((size_t)max_polls + 1, sizeof(sync_bench_request)); + if (!requests) return SQLITE_NOMEM; + + remove(DB_A_PATH); + remove(DB_B_PATH); + cloudsync_memory_init(1); + + bench_trace("step=benchmark-setup begin database_id=%s poll_delay_ms=%d max_polls=%d", database_id, poll_delay_ms, max_polls); + rc = setup_database("db_a", DB_A_PATH, database_id, address, apikey, &db_a); + if (rc != SQLITE_OK) goto cleanup; + rc = setup_database("db_b", DB_B_PATH, database_id, address, apikey, &db_b); + if (rc != SQLITE_OK) goto cleanup; + bench_trace("step=benchmark-setup end rc=%d", rc); + + rc = cleanup_old_benchmark_rows(db_a, cleanup_older_than_seconds, &cleanup_deleted_rows); + if (rc != SQLITE_OK) goto cleanup; + if (cleanup_deleted_rows > 0) { + bench_trace("step=cleanup-send db=db_a deleted=%d begin sql=cloudsync_network_send_changes", cleanup_deleted_rows); + rc = db_exec(db_a, "SELECT cloudsync_network_send_changes();"); + bench_trace("step=cleanup-send db=db_a deleted=%d end rc=%d", cleanup_deleted_rows, rc); + if (rc != SQLITE_OK) goto cleanup; + } + + cloudsync_uuid_v7_string(row_id, true); + snprintf(marker, sizeof(marker), "sync-bench-%s", row_id); + snprintf(payload, sizeof(payload), "payload-%s", row_id); + + if (random_blob_size > 0) { + random_blob = malloc((size_t)random_blob_size); + if (!random_blob) { + rc = SQLITE_NOMEM; + goto cleanup; + } + sqlite3_randomness(random_blob_size, random_blob); + } else { + random_blob = &empty_blob; + } + + rc = insert_benchmark_row(db_a, row_id, payload, marker, random_blob, random_blob_size); + if (rc != SQLITE_OK) goto cleanup; + + bench_trace("step=verify-before-send db=db_b row_id=%s begin", row_id); + rc = verify_row(db_b, row_id, payload, marker, random_blob, random_blob_size, &applied); + bench_trace("step=verify-before-send db=db_b row_id=%s end rc=%d applied=%s", row_id, rc, applied ? "true" : "false"); + if (rc != SQLITE_OK) goto cleanup; + if (applied) { + fprintf(stderr, "Error: benchmark row already exists in receiver before send.\n"); + rc = SQLITE_ERROR; + goto cleanup; + } + + bench_trace("step=send db=db_a row_id=%s begin sql=cloudsync_network_send_changes", row_id); + rc = timed_request(db_a, &requests[request_count++], "send", 1, "SELECT cloudsync_network_send_changes();"); + bench_trace("step=send db=db_a row_id=%s end rc=%d elapsed_ms=%.2f", row_id, rc, requests[request_count - 1].elapsed_ms); + if (rc != SQLITE_OK) goto cleanup; + send_summary.status = json_text_at_path(db_a, requests[0].result_json, "$.send.status"); + send_summary.local_version = json_int_at_path(db_a, requests[0].result_json, "$.send.localVersion", -1); + send_summary.server_version = json_int_at_path(db_a, requests[0].result_json, "$.send.serverVersion", -1); + double total_start_ms = requests[0].started_ms; + + for (int i = 0; i < max_polls; i++) { + if (i > 0 && poll_delay_ms > 0) { + bench_trace("step=poll-sleep attempt=%d delay_ms=%d begin", i + 1, poll_delay_ms); + double sleep_start_ms = monotonic_ms(); + sqlite3_sleep(poll_delay_ms); + double sleep_elapsed_ms = monotonic_ms() - sleep_start_ms; + poll_sleep_ms += sleep_elapsed_ms; + bench_trace("step=poll-sleep attempt=%d end elapsed_ms=%.2f", i + 1, sleep_elapsed_ms); + } + bench_trace("step=check db=db_b attempt=%d row_id=%s begin sql=cloudsync_network_check_changes", i + 1, row_id); + rc = timed_request(db_b, &requests[request_count++], "check", i + 1, "SELECT cloudsync_network_check_changes();"); + polls = i + 1; + bench_trace("step=check db=db_b attempt=%d row_id=%s end rc=%d rows=%d elapsed_ms=%.2f", i + 1, row_id, rc, requests[request_count - 1].rows_received, requests[request_count - 1].elapsed_ms); + if (rc != SQLITE_OK) goto cleanup; + + bench_trace("step=verify-after-check db=db_b attempt=%d row_id=%s begin", i + 1, row_id); + double verify_start_ms = monotonic_ms(); + rc = verify_row(db_b, row_id, payload, marker, random_blob, random_blob_size, &applied); + double verify_end_ms = monotonic_ms(); + verify_ms = verify_end_ms - verify_start_ms; + bench_trace("step=verify-after-check db=db_b attempt=%d row_id=%s end rc=%d applied=%s elapsed_ms=%.2f", i + 1, row_id, rc, applied ? "true" : "false", verify_ms); + if (rc != SQLITE_OK) goto cleanup; + + if (applied) { + total_ms = requests[request_count - 1].ended_ms - total_start_ms; + break; + } + } + + if (!applied) { + total_ms = request_count > 0 ? requests[request_count - 1].ended_ms - requests[0].started_ms : 0.0; + rc = SQLITE_BUSY; + } + + for (int i = 0; i < request_count; i++) request_ms += requests[i].elapsed_ms; + measured_overhead_ms = total_ms - request_ms - poll_sleep_ms; + if (measured_overhead_ms < 0.0 && measured_overhead_ms > -0.01) measured_overhead_ms = 0.0; + +cleanup: + bench_trace("step=report begin rc=%d applied=%s request_count=%d", rc, applied ? "true" : "false", request_count); + if (output && strcmp(output, "json") == 0) { + print_json_report(poll_delay_ms, max_polls, random_blob_size, cleanup_older_than_seconds, cleanup_deleted_rows, + row_id, applied, polls, total_ms, verify_ms, + request_ms, poll_sleep_ms, measured_overhead_ms, send_summary, + requests, request_count); + } else { + print_text_report(database_id, poll_delay_ms, max_polls, random_blob_size, cleanup_older_than_seconds, cleanup_deleted_rows, + row_id, applied, polls, total_ms, verify_ms, + request_ms, poll_sleep_ms, measured_overhead_ms, send_summary, + requests, request_count); + } + bench_trace("step=report end rc=%d", rc); + + if (!applied && rc == SQLITE_BUSY) { + fprintf(stderr, "Error: row was not applied to receiver after %d polls.\n", max_polls); + } + + if (db_a) { + bench_trace("step=terminate db=db_a begin"); + db_exec(db_a, "SELECT cloudsync_terminate();"); + sqlite3_close(db_a); + bench_trace("step=terminate db=db_a end"); + } + if (db_b) { + bench_trace("step=terminate db=db_b begin"); + db_exec(db_b, "SELECT cloudsync_terminate();"); + sqlite3_close(db_b); + bench_trace("step=terminate db=db_b end"); + } + free_requests(requests, request_count); + free(send_summary.status); + if (random_blob && random_blob != &empty_blob) free(random_blob); + free(requests); + cloudsync_memory_finalize(); + return rc == SQLITE_OK ? 0 : rc; +}