A MapReduce Technique for Tabled Prolog Parallelism (draft)

Paul Fodor and Diptikalyan Saha

Abstract

The MapReduce technique for Tabled Prolog parallelism takes a Prolog program and analyzes it, dividing the prolog program into multiple small programs and assign different copies to multiple XSB instances running on different machines to work on different sets of data, and combines the different results sets obtained from different machines to get the result of the original query.

Documentation:

in progress

Sources:

in progress (see below) http://www.cs.sunysb.edu/~pfodor/mode_analysis.P, http://www.cs.sunysb.edu/~pfodor/split.P.

Algorithm and implementation:

Date: April 5, 2009

%%%%%%%%%%%%%begin%%%%%%%%%%%%%
/*
Given a query and a prolog program, find the modes of call to
predicates in the program for that query.
The prolog program is specified as p_clause(Head,Body).
*/

:- table get_mode/1.
:- import trie_assert/1 from tables.

query_mode(Query,L):-
	Query=..[Pred|Args],
	change_out_to_var(Args,Args1),
	NewQuery=..[Pred|Args1],
	(get_mode(NewQuery),fail;true),
	findall(X,mode(X),L),
	retractall(mode).

get_mode(findall(Var,Cond,L)):-!,
	findall(Var,get_mode(Cond),L).
get_mode((A,B)):-!,
	get_mode(A),
	get_mode(B).
get_mode((A;B)):-!,(
	get_mode(A);
	get_mode(B)).
get_mode(not(Query)):-!,
	not(get_mode(Query)).	
get_mode(tnot(Query)):-!,
	tnot(get_mode(Query)).	
get_mode(Call):-
	functor(Call,Pred,Arity),
	edb(Pred/Arity),
	!,
	dump_mode(Call),
	ground(Arity,AnswerSubs), % assuming the edb will be ground
	Call1=..[Pred|AnswerSubs],
	Call1=Call.
get_mode(Call):-
	dump_mode(Call),
	p_clause(Call,Body),
	execute_list(Body).

execute_list([]).
execute_list([Subgoal|RestofBody]):-
	get_mode(Subgoal),
	execute_list(RestofBody).

:- table(dump_mode/1).
dump_mode(Goal):-
	Goal=..[Pred|Args],
	check_args(Args,Args1),
	Goal1=..[Pred|Args1],
	trie_assert(mode(Goal1)).

check_args([],[]).
check_args([X|Xs],[Y|Ys]):-
	(var(X) -> Y=output; Y=input),
	check_args(Xs,Ys).

ground(0,[]).
ground(N,[input|X]):-
	N>0,
	N1 is N-1,
	ground(N1,X).

change_out_to_var([],[]).
change_out_to_var([X|Xs],[Y|Ys]):-
	(X=output -> Y=_NewVar; Y=input),
	change_out_to_var(Xs,Ys).


/* Split a fact file into multiple fact files based on the calling modes and some page size. */

% split(+InputFile,+CallingModes,+PageSize,-Index).
% Example query: ?- split('input_01.P',[edge(input,output)],2,Index).
split(InputFile,[H|RestCallingModes],PageSize,[mode(H,OutputHandles)|RestOutputHandles]):-
	split_mode(InputFile,H,PageSize,OutputHandles),
	split(InputFile,RestCallingModes,PageSize,RestOutputHandles).
split(_InputFile,[],_PageSize,[]).

split_mode(InputFile,CM,Size,OutputHandles):-
	open(InputFile,read,InputHandle),
	PrevOutputHandles=[], % handle(Key,OutputFiles,CurrentFileSize)
	resetCounter(file),
	repeat_read(InputHandle,CM,Size,PrevOutputHandles,OutputHandles),
	%write(OutputHandles),
	close(InputHandle), closeOutputFiles(OutputHandles). % close all open files

% repeat_read(InputHandle,CM,Size,PrevOutputHandles,OutputHandles)
repeat_read(InputHandle,CM,Size,PrevOutputHandles,OutputHandles):-
	read(InputHandle,Term),
	generateKey(CM,Term,Key),
	(
		(
			mySelect(handle(Key,OutputFileList,CurrentFileSize),PrevOutputHandles,Rest),
			CurrentFileSize=Size,
			counter(file,N),incCounter(file),number_codes(N,List),atom_codes(OutputFile,List),
			open(OutputFile,write,NewOutputFileHandle),
			write(NewOutputFileHandle,Term),write(NewOutputFileHandle,'.\n'),
			TempOutputHandles = [handle(Key,[file(NewOutputFileHandle,OutputFile)|OutputFileList],1)|Rest],
			repeat_read(InputHandle,CM,Size,TempOutputHandles,OutputHandles)
		);(
			mySelect(handle(Key,[file(OutputFileHandle,OutputFile)|OutputFileList],CurrentFileSize),PrevOutputHandles,Rest),
			CurrentFileSize1 is CurrentFileSize+1,
			write(OutputFileHandle,Term),write(OutputFileHandle,'.\n'),
			TempOutputHandles = [handle(Key,[file(OutputFileHandle,OutputFile)|OutputFileList],CurrentFileSize1)|Rest],
			repeat_read(InputHandle,CM,Size,TempOutputHandles,OutputHandles)
		);(
			counter(file,N),incCounter(file),number_codes(N,List),atom_codes(OutputFile,List),
			open(OutputFile,write,OutputFileHandle),
			write(OutputFileHandle,Term),write(OutputFileHandle,'.\n'),
			TempOutputHandles = [handle(Key,[file(OutputFileHandle,OutputFile)],1)|PrevOutputHandles],
			repeat_read(InputHandle,CM,Size,TempOutputHandles,OutputHandles)
		)
	),
	!.
repeat_read(_,_,_,OutputHandles,OutputHandles).

generateKey(CM,Term,Output):-
	CM=..[Functor|ArgsTypes],
	Term=..[Functor|Args],
	generateArgs(ArgsTypes,Args,ArgsOutput),
	Output=..[Functor|ArgsOutput].
generateArgs([],[],[]).
generateArgs([output|ArgsTypes],[_|Args],[output|ArgsOutput]):-
	generateArgs(ArgsTypes,Args,ArgsOutput),
	!.
generateArgs([input|ArgsTypes],[H|Args],[H|ArgsOutput]):-
	generateArgs(ArgsTypes,Args,ArgsOutput).

closeOutputFiles([handle(_Key,OutputFiles,_CurrentFileSize)|RestOutputHandles]):-
	closeOutputFileHandles(OutputFiles),
	closeOutputFiles(RestOutputHandles).
closeOutputFiles([]).
closeOutputFileHandles([file(OutputFileHandle,_OutputFile)|OutputFiles]):-
	close(OutputFileHandle),
	closeOutputFileHandles(OutputFiles).
closeOutputFileHandles([]).

:- dynamic(counter/2).
incCounter(C):- counter(C,N), retractall(counter(C,_)), N1 is N+1, assert(counter(C,N1)). 
resetCounter(C):- retractall(counter(C,_)), assert(counter(C,0)). 

mySelect(H,[H|T],T).
mySelect(X,[H|T],[H|T1]):-
	mySelect(X,T,T1).

myMember(X,[X|_]).
myMember(X,[_|T]):-
	myMember(X,T).


execQuery(Key,PageIndex):-
	call_mode(Key,CM),
	myMember(mode(CM,OutputHandles),PageIndex),
	copy_term(Key,KeyTemp),
	myMember(handle(KeyTemp,OutputFiles,_CurrentFileSize),OutputHandles),
	pageLoader(Key,OutputFiles).
pageLoader(Key,[file(_,FileName)|_Rest]):-
	open(FileName,read,FileHandler),
	read(FileHandler,Key).
pageLoader(Key,[_|Rest]):-
	pageLoader(Key,Rest).	

call_mode(Goal,CM):-
	Goal=..[Pred|Args],
	check_args(Args,Args1),
	CM=..[Pred|Args1].
check_args([],[]).
check_args([X|Xs],[Y|Ys]):-
	(var(X) -> Y=output; Y=input),
	check_args(Xs,Ys).

%%%%%%%%%%%%%end%%%%%%%%%%%%

Paul Fodor, 04/05/2009