build-update-repo: Spawn subprocesses when generating deltas

This means we can be parallel just like we were with threads,
but we're not using an enourmous amount of memory.
tingping/wmclass
Alexander Larsson 2017-04-19 22:16:17 +02:00
parent 9e80b3a1c1
commit b4ee8581c3
1 changed files with 78 additions and 90 deletions

View File

@ -57,78 +57,6 @@ static GOptionEntry options[] = {
{ NULL }
};
typedef struct {
OstreeRepo *repo;
GVariant *params;
char *ref;
char *from;
char *to;
} DeltaData;
static void
delta_data_free (DeltaData *data)
{
g_object_unref (data->repo);
g_variant_unref (data->params);
g_free (data->ref);
g_free (data->from);
g_free (data->to);
g_free (data);
}
static DeltaData *
delta_data_push (GThreadPool *pool,
OstreeRepo *repo,
GVariant *params,
const char *ref,
const char *from,
const char *to,
GError **error)
{
DeltaData *data = g_new0 (DeltaData, 1);
data->repo = g_object_ref (repo);
data->params = g_variant_ref (params);
data->ref = g_strdup (ref);
data->from = g_strdup (from);
data->to = g_strdup (to);
if (!g_thread_pool_push (pool, data, error))
{
delta_data_free (data);
return NULL;
}
return data;
}
static void
generate_delta_thread (gpointer _data,
gpointer user_data)
{
DeltaData *data = (DeltaData*) _data;
g_autoptr(GError) error = NULL;
if (data->from == NULL)
g_print (_("Generating delta: %s (%.10s)\n"), data->ref, data->to);
else
g_print (_("Generating delta: %s (%.10s-%.10s)\n"), data->ref, data->from, data->to);
if (!ostree_repo_static_delta_generate (data->repo, OSTREE_STATIC_DELTA_GENERATE_OPT_MAJOR,
data->from, data->to, NULL,
data->params,
NULL, &error))
{
if (data->from == NULL)
g_printerr (_("Failed to generate delta %s (%.10s): %s\n"),
data->ref, data->to, error->message);
else
g_printerr (_("Failed to generate delta %s (%.10s-%.10s): %s\n"),
data->ref, data->from, data->to, error->message);
}
delta_data_free (data);
}
static void
_ostree_parse_delta_name (const char *delta_name,
char **out_from,
@ -273,12 +201,76 @@ generate_one_delta (OstreeRepo *repo,
return TRUE;
}
static void
delta_generation_done (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
{
int *n_spawned_delta_generate = user_data;
(*n_spawned_delta_generate)--;
}
static gboolean
spawn_delete_generation (GMainContext *context,
int *n_spawned_delta_generate,
OstreeRepo *repo,
GVariant *params,
const char *ref,
const char *from,
const char *to,
GError **error)
{
g_autoptr(GSubprocessLauncher) launcher = g_subprocess_launcher_new (0);
g_autoptr(GSubprocess) subprocess = NULL;
const char *argv[] = {
"/proc/self/exe",
"build-update-repo",
"--generate-static-delta-ref",
ref,
"--generate-static-delta-to",
to,
NULL, NULL, NULL, NULL
};
int i = 6;
g_autofree char *exe = NULL;
exe = flatpak_readlink ("/proc/self/exe", NULL);
if (exe)
argv[0] = exe;
if (from)
{
argv[i++] = "--generate-static-delta-from";
argv[i++] = from;
}
argv[i++] = flatpak_file_get_path_cached (ostree_repo_get_path (repo));
argv[i++] = NULL;
g_assert (i <= G_N_ELEMENTS (argv));
while (*n_spawned_delta_generate > g_get_num_processors ())
g_main_context_iteration (context, TRUE);
subprocess = g_subprocess_launcher_spawnv (launcher, argv, error);
if (subprocess == NULL)
return FALSE;
(*n_spawned_delta_generate)++;
g_subprocess_wait_async (subprocess, NULL, delta_generation_done, n_spawned_delta_generate);
return TRUE;
}
static gboolean
generate_all_deltas (OstreeRepo *repo,
GPtrArray **unwanted_deltas,
GCancellable *cancellable,
GError **error)
{
g_autoptr(GMainContext) context = g_main_context_new ();
g_autoptr(GHashTable) all_refs = NULL;
g_autoptr(GHashTable) all_deltas_hash = NULL;
g_autoptr(GHashTable) wanted_deltas_hash = NULL;
@ -288,7 +280,7 @@ generate_all_deltas (OstreeRepo *repo,
gpointer key, value;
g_autoptr(GVariantBuilder) parambuilder = NULL;
g_autoptr(GVariant) params = NULL;
GThreadPool *thread_pool;
int n_spawned_delta_generate = 0;
g_print ("Generating static deltas\n");
@ -314,11 +306,7 @@ generate_all_deltas (OstreeRepo *repo,
cancellable, error))
return FALSE;
thread_pool = g_thread_pool_new (generate_delta_thread, NULL,
g_get_num_processors (), FALSE,
error);
if (thread_pool == NULL)
return FALSE;
g_main_context_push_thread_default (context);
g_hash_table_iter_init (&iter, all_refs);
while (g_hash_table_iter_next (&iter, &key, &value))
@ -339,9 +327,9 @@ generate_all_deltas (OstreeRepo *repo,
/* From empty */
if (!g_hash_table_contains (all_deltas_hash, commit))
{
if (!delta_data_push (thread_pool, repo, params,
ref, NULL, commit,
error))
if (!spawn_delete_generation (context, &n_spawned_delta_generate, repo, params,
ref, NULL, commit,
error))
goto error;
}
@ -365,9 +353,9 @@ generate_all_deltas (OstreeRepo *repo,
if (!g_hash_table_contains (all_deltas_hash, from_parent))
{
if (!delta_data_push (thread_pool, repo, params,
ref, parent_commit, commit,
error))
if (!spawn_delete_generation (context, &n_spawned_delta_generate, repo, params,
ref, parent_commit, commit,
error))
goto error;
}
@ -376,6 +364,11 @@ generate_all_deltas (OstreeRepo *repo,
}
}
while (n_spawned_delta_generate > 0)
g_main_context_iteration (context, TRUE);
g_main_context_pop_thread_default (context);
*unwanted_deltas = g_ptr_array_new_with_free_func (g_free);
for (i = 0; i < all_deltas->len; i++)
{
@ -384,15 +377,10 @@ generate_all_deltas (OstreeRepo *repo,
g_ptr_array_add (*unwanted_deltas, g_strdup (delta));
}
/* This block until all are done */
g_thread_pool_free (thread_pool, FALSE, TRUE);
return TRUE;
error:
/* TODO: In this error case we're leaking all the DeltaDatas we have not yet
processed. I don't know how to fix this though... */
g_thread_pool_free (thread_pool, TRUE, FALSE);
g_main_context_pop_thread_default (context);
return FALSE;
}