data partitioning

This commit is contained in:
Brian Quinion
2010-10-26 15:22:41 +00:00
parent 4c2c499860
commit e6d983474b
10 changed files with 273 additions and 511 deletions

View File

@@ -151,21 +151,33 @@ void nominatim_exportCreatePreparedQueries(PGconn * conn)
res = PQprepare(conn, "placex_details",
"select osm_type, osm_id, class, type, name, housenumber, country_code, ST_AsText(geometry), admin_level, rank_address, rank_search from placex where place_id = $1",
1, pg_prepare_params);
if (PQresultStatus(res) != PGRES_COMMAND_OK) exit(EXIT_FAILURE);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Error preparing placex_details: %s", PQerrorMessage(conn));
exit(EXIT_FAILURE);
}
PQclear(res);
pg_prepare_params[0] = PG_OID_INT8;
res = PQprepare(conn, "placex_address",
"select osm_type,osm_id,class,type,distance,cached_rank_address from place_addressline join placex on (address_place_id = placex.place_id) where isaddress and place_addressline.place_id = $1 and address_place_id != place_addressline.place_id order by cached_rank_address asc",
1, pg_prepare_params);
if (PQresultStatus(res) != PGRES_COMMAND_OK) exit(EXIT_FAILURE);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Error preparing placex_address: %s", PQerrorMessage(conn));
exit(EXIT_FAILURE);
}
PQclear(res);
pg_prepare_params[0] = PG_OID_INT8;
res = PQprepare(conn, "placex_names",
"select (each(name)).key,(each(name)).value from (select keyvalueToHStore(name) as name from placex where place_id = $1) as x",
"select (each(name)).key,(each(name)).value from (select name as name from placex where place_id = $1) as x",
1, pg_prepare_params);
if (PQresultStatus(res) != PGRES_COMMAND_OK) exit(EXIT_FAILURE);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Error preparing placex_names: %s", PQerrorMessage(conn));
exit(EXIT_FAILURE);
}
PQclear(res);
}

View File

@@ -51,7 +51,7 @@ void nominatim_index(int rank_min, int rank_max, int num_threads, const char *co
Oid pg_prepare_params[2];
conn = PQconnectdb(conninfo);
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
exit(EXIT_FAILURE);
@@ -59,17 +59,25 @@ void nominatim_index(int rank_min, int rank_max, int num_threads, const char *co
pg_prepare_params[0] = PG_OID_INT4;
res = PQprepare(conn, "index_sectors",
"select geometry_sector,count(*) from placex where rank_search = $1 and indexed = false and name is not null group by geometry_sector order by geometry_sector",
"select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
1, pg_prepare_params);
if (PQresultStatus(res) != PGRES_COMMAND_OK) exit(EXIT_FAILURE);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
exit(EXIT_FAILURE);
}
PQclear(res);
pg_prepare_params[0] = PG_OID_INT4;
pg_prepare_params[1] = PG_OID_INT4;
res = PQprepare(conn, "index_sector_places",
"select place_id from placex where rank_search = $1 and geometry_index(geometry,indexed,name) = $2",
"select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
2, pg_prepare_params);
if (PQresultStatus(res) != PGRES_COMMAND_OK) exit(EXIT_FAILURE);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
exit(EXIT_FAILURE);
}
PQclear(res);
// Build the data for each thread
@@ -82,11 +90,15 @@ void nominatim_index(int rank_min, int rank_max, int num_threads, const char *co
exit(EXIT_FAILURE);
}
pg_prepare_params[0] = PG_OID_INT8;
pg_prepare_params[0] = PG_OID_INT4;
res = PQprepare(thread_data[i].conn, "index_placex",
"update placex set indexed = true where place_id = $1",
"update placex set indexed_status = 0 where place_id = $1",
1, pg_prepare_params);
if (PQresultStatus(res) != PGRES_COMMAND_OK) exit(EXIT_FAILURE);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
exit(EXIT_FAILURE);
}
PQclear(res);
nominatim_exportCreatePreparedQueries(thread_data[i].conn);
@@ -159,7 +171,7 @@ void nominatim_index(int rank_min, int rank_max, int num_threads, const char *co
PQclear(resPlaces);
exit(EXIT_FAILURE);
}
if (PQftype(resPlaces, 0) != PG_OID_INT8)
if (PQftype(resPlaces, 0) != PG_OID_INT4)
{
fprintf(stderr, "Place_id value has unexpected type\n");
PQclear(resPlaces);
@@ -236,8 +248,8 @@ void *nominatim_indexThread(void * thread_data_in)
const char *paramValues[1];
int paramLengths[1];
int paramFormats[1];
uint64_t paramPlaceID;
uint64_t place_id;
uint32_t paramPlaceID;
uint32_t place_id;
while(1)
{
@@ -248,13 +260,13 @@ void *nominatim_indexThread(void * thread_data_in)
break;
}
place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
place_id = PGint32(*((uint32_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
(*thread_data->count)++;
pthread_mutex_unlock( thread_data->count_mutex );
//printf(" Processing place_id %ld\n", place_id);
paramPlaceID = PGint64(place_id);
paramPlaceID = PGint32(place_id);
paramValues[0] = (char *)&paramPlaceID;
paramLengths[0] = sizeof(paramPlaceID);
paramFormats[0] = 1;

View File

@@ -202,6 +202,11 @@ int main(int argc, char *argv[])
}
PQfinish(conn);
if (!index && !export && !import)
{
fprintf(stderr, "Please select index, export or import.\n");
exit(EXIT_FAILURE);
}
if (index) nominatim_index(0, 30, threads, conninfo, file);
if (export) nominatim_export(0, 30, conninfo, file);
if (import) nominatim_import(conninfo, tagsfile, file);