public class

JXServer.RoomId

extends Object
package edu.stanford.junction.provider.jx;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.json.JSONException;
import org.json.JSONObject;

import edu.stanford.junction.JunctionException;
import edu.stanford.junction.JunctionMaker;
import edu.stanford.junction.SwitchboardConfig;
import edu.stanford.junction.api.activity.ActivityScript;
import edu.stanford.junction.api.activity.JunctionActor;
import edu.stanford.junction.api.messaging.MessageHeader;
import edu.stanford.junction.provider.jx.json.JsonHandler;
import edu.stanford.junction.provider.jx.json.JsonSocketHandler;
import edu.stanford.junction.provider.jx.json.JsonWebSocketHandler;

public class JXServer {
	public static final int SERVER_PORT = 8283;
	private static final String TAG = "jx_server";
	private static final int BUFFER_LENGTH = 2048;
	
	private Map<RoomId, JSONObject> mActivityScripts;
	private Set<ConnectedThread> mConnections;
	private Map<RoomId, Map<String, ConnectedThread>> mSubscriptions;
	private AcceptThread mAcceptThread;
	
	public static void main(String[] argv) {
		final String TAG = "test";
		
		JXServer server = new JXServer();
		Log.d(TAG, "Starting server.");
		server.start();
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {}
		
		final TestActor a1 = server.new TestActor("a1");
		final TestActor a2 = server.new TestActor("a2");
		final TestActor a3 = server.new TestActor("a3");
		a3.buddy = a2;
		
		
		final URI uri = URI.create("junction://localhost/jxserver#jx");
		final SwitchboardConfig cfg = JunctionMaker.getDefaultSwitchboardConfig(uri); 
		final ActivityScript script = new ActivityScript();
		script.setFriendlyName("Test Session");
		script.setActivityID("org.openjunction.test");
		
		boolean TEST_CLIENTS = true;
		if (TEST_CLIENTS) {
			try {
				Log.d(TAG, "Attempting to join session");
				JunctionMaker.getInstance(cfg).newJunction(uri, script, a1);
				JunctionMaker.getInstance(cfg).newJunction(uri, script, a2);
				JunctionMaker.getInstance(cfg).newJunction(uri, script, a3);
			} catch (JunctionException e) {
				Log.e(TAG, "error joining juction", e);
			}
		}
	}
	
	
	class TestActor extends JunctionActor {
		JunctionActor buddy;
		final String name;
		public TestActor(String name) {
			super("test");
			this.name = name;
		}
		
		@Override
		public void onMessageReceived(MessageHeader header, JSONObject message) {
			Log.d(TAG, name + " got: " + message.toString() + " !" + " from " + header.from);
		}
		
		@Override
		public void onActivityJoin() {
			super.onActivityJoin();
			Log.d(TAG, name + " joined session!");
			try {
				new Thread() {
					boolean loop = false;
					JSONObject hello = new JSONObject("{\"msg\":\"hello world! from: " + name + "\"}");
					public void run() {
						do {
							sendMessageToSession(hello);
							Log.d(TAG, name + " sent a message.");
							try {
								Thread.sleep(5000);
							} catch (Exception e) {}
						} while (loop);
					};
				}.start();
				
				if ("a3".equals(name)) {
					sendMessageToActor(buddy.getActorID(), new JSONObject("{\"psst\":\"hi\"}"));
					Log.d(TAG, name + " sent a secret message to " );
				}
			} catch (JSONException e) {
				e.printStackTrace();
			}
		}
		
		@Override
		public void onActivityCreate() {
			Log.d(TAG, "CREATED the session!!!");
		}
	};
	
	
	public JXServer() {
		
	}
	
	/**
	 * Starts a simple chat server, allowing users to
	 * connect to an arbitrary chat room.
	 */
	public void start() {
		mConnections = new HashSet<ConnectedThread>();
		mSubscriptions = new ConcurrentHashMap<RoomId, Map<String, ConnectedThread>>();
		mActivityScripts = new ConcurrentHashMap<RoomId, JSONObject>();
		mAcceptThread = new AcceptThread();
		mAcceptThread.start();
	}
	
	public void stop() {
		mAcceptThread.cancel();
		mAcceptThread = null;
		mConnections = null;
		
		mSubscriptions.clear();
		mSubscriptions = null;
		
		mActivityScripts.clear();
		mActivityScripts = null;
	}
	
	private class AcceptThread extends Thread {
        // The local server socket
        private final ServerSocket mmServerSocket;

        public AcceptThread() {
            ServerSocket tmp = null;
            
            // Create a new listening server socket
            try {
                tmp = new ServerSocket(SERVER_PORT);
            } catch (IOException e) {
                System.err.println("Could not open server socket");
                e.printStackTrace(System.err);
            }
            mmServerSocket = tmp;
        }

        public void run() {
            //Log.d(TAG, "BEGIN mAcceptThread" + this);
            setName("AcceptThread");
            Socket socket = null;

            // Listen to the server socket always
            while (true) {
                try {
                    // This is a blocking call and will only return on a
                    // successful connection or an exception
                	Log.d(TAG, "waiting for client...");
                    socket = mmServerSocket.accept();
                    Log.d(TAG, "Client connected!");
                } catch (SocketException e) {
                	
                } catch (IOException e) {
                    Log.e(TAG, "accept() failed", e);
                    break;
                }

                // If a connection was accepted
                if (socket == null) {
                	break;
                }
                
                //synchronized (JXServer.this) {
                    ConnectedThread conThread = new ConnectedThread(socket);
                    conThread.start();
                    mConnections.add(conThread);
                //}
            }
            Log.d(TAG, "END mAcceptThread");
        }

        public void cancel() {
            Log.d(TAG, "cancel " + this);
            try {
                mmServerSocket.close();
            } catch (IOException e) {
                Log.e(TAG, "close() of server failed", e);
            }

            for (ConnectedThread conn : mConnections) {
        		conn.cancel();
            }
            mConnections.clear();
        }
    }
	
    /**
     * This thread runs during a connection with a remote device.
     * It handles all incoming and outgoing transmissions.
     */
    private class ConnectedThread extends Thread {
        private final Socket mmSocket;
        private final InputStream mmInStream;
        private final OutputStream mmOutStream;
        private Set<RoomOccupancy> mmSubscriptions;
        private JsonHandler mmJsonHelper;

        public ConnectedThread(Socket socket) {
            Log.d(TAG, "create ConnectedThread");

            mmSubscriptions = new HashSet<RoomOccupancy>();
            mmSocket = socket;
            InputStream tmpIn = null;
            OutputStream tmpOut = null;

            try {
                tmpIn = socket.getInputStream();
                tmpOut = socket.getOutputStream();
            } catch (IOException e) {
                Log.e(TAG, "temp sockets not created", e);
            }

            mmInStream = tmpIn;
            mmOutStream = tmpOut;
        }

        public void run() {
            Log.d(TAG, "BEGIN mConnectedThread");
            byte[] buffer = new byte[BUFFER_LENGTH];
            int bytes;

            // Read header information, determine connection type
            try {
            	bytes = mmInStream.read(buffer);
            	String header = new String(buffer, 0, bytes);
            	
            	// determine request type
            	if (header.startsWith("GET ")) {
            		Log.d(TAG, "Found HTTP GET request");
            		doHttpConnection(header);
            	} else if (header.startsWith("JUNCTION")) {
            		Log.d(TAG, "Found Junction connection");
            		mmJsonHelper = new JsonSocketHandler(mmInStream, mmOutStream);
            		doJunctionConnection();
            	}
            } catch (IOException e) {
            	Log.e(TAG, "Error reading connection header", e);
            }
            
            // No longer listening.
            cancel();
        }
        
        private void doHttpConnection(String header) {
        	Log.d(TAG, "HTTP header:\n" + header);
        	String[] lines = header.split("\r\n");
        	boolean isWebSocket = false;
        	String origin = null;
        	String host = null;
        	String webSocketKey1 = null;
        	String webSocketKey2 = null;

        	for (String l : lines) {
        		if (l.startsWith("Upgrade: WebSocket")) {
        			isWebSocket = true;
        		}
        		if (l.startsWith("Sec-WebSocket-Key1: ")) {
        			webSocketKey1 = l.substring(20);
        		}
        		if (l.startsWith("Sec-WebSocket-Key2: ")) {
        			webSocketKey2 = l.substring(20);
        		}
        		
        		if (l.startsWith("Origin: ")) {
        			origin = l.substring(8);
        		}
        		
        		if (l.startsWith("Host: ")) {
        			host = l.substring(6);
        		}
        	}
        	
			if (!isWebSocket) {
				Log.e(TAG, "Not a websocket request");
				return;
			}

			long v1 = getKeyNumber(webSocketKey1);
			long v2 = getKeyNumber(webSocketKey2);

			int s1 = countSpaces(webSocketKey1);
			int s2 = countSpaces(webSocketKey2);

			if (v1 % s1 != 0 || v2 % s2 != 0) {
				Log.e(TAG, "WebSocket failed handshake");
			}
			
			long p1 = v1 / s1;
			long p2 = v2 / s2;
			String p3 = header.substring(header.length()-8);
			byte[] response = webSocketResponse(p1, p2, p3);
			String endpoint = "ws://" + host + "/"; // TODO
			
			this.write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n");
			this.write("Upgrade: WebSocket\r\n");
			this.write("Connection: Upgrade\r\n");
			this.write("Sec-WebSocket-Origin: " + origin + "\r\n");
			this.write("Sec-WebSocket-Location: " + endpoint + "\r\n");
			this.write("\r\n");
			this.write(response, response.length);
			try {
				mmOutStream.flush();
			} catch (IOException e) {
				Log.e(TAG, "Error completing handshake", e);
				return;
			}

			// TODO: support length prefixed data?
			mmJsonHelper = new JsonWebSocketHandler(mmInStream, mmOutStream);
			doJunctionConnection();
        }
        
        private byte[] webSocketResponse(long p1, long p2, String p3) {
        	try {
        		byte[] challenge = new byte[16];
        		challenge[0] = (byte)( p1 >>> 24 );
        		challenge[1] = (byte)( (p1 << 8) >>> 24 );
        		challenge[2] = (byte)( (p1 << 16) >>> 24 );
        		challenge[3] = (byte)( (p1 << 24) >>> 24 );
        		
        		challenge[4] = (byte)( p2 >>> 24 );
        		challenge[5] = (byte)( (p2 << 8) >>> 24 );
        		challenge[6] = (byte)( (p2 << 16) >>> 24 );
        		challenge[7] = (byte)( (p2 << 24) >>> 24 );
        		
        		System.arraycopy(p3.getBytes(), 0, challenge, 8, 8);
        		
        		MessageDigest md = MessageDigest.getInstance("MD5");
        		byte[] resp = md.digest(challenge);
        		return resp;
        	} catch (Exception e) {
        		Log.e(TAG, "Error computing response to websocket challenge", e);
        		return null;
        	}
        }
        
        private long getKeyNumber(String key) {
        	long n = 0;
        	for (int i = 0; i < key.length(); i++) {
        		char c = key.charAt(i);
        		if ('0' <= c && c <= '9') {
        			n = 10*n + (c-'0'); 
        		}
        	}
        	return n;
        }
        
        private int countSpaces(String key) {
        	int n = 0;
        	for (int i = 0; i < key.length(); i++) {
        		char c = key.charAt(i);
        		if (c == ' ') n++;
        	}
        	return n;
        }
        
        private void doJunctionConnection() {
        	
            // Keep listening to the InputStream while connected
            while (true) {
                try {
                    JSONObject json = mmJsonHelper.jsonFromStream();
                    if (json == null) {
                    	break;
                    }
                    handleJson(json);
                } catch (IOException e) {
                    Log.e(TAG, "disconnected", e);
                    //connectionLost();
                    break;
                }
            }
        }
                
        private void handleJson(JSONObject json) {
        	try {
        		Log.d(TAG, json.toString());
        		
	        	if (json.has(Junction.NS_JX)) {
	            	JSONObject jx = json.getJSONObject(Junction.NS_JX);
	            	if (jx.has(Junction.JX_SYS_MSG)) {
	                	JSONObject sys = jx.getJSONObject(Junction.JX_SYS_MSG);
	                	// Join
	                	if (sys.has("join")) {
	                		boolean isCreator = false;
	                		
	                		String roomName = sys.getString("join");
	                		RoomId joinRoom = getRoomId(roomName);
	                		String me = sys.getString("id");
	                		synchronized (joinRoom) {
		                		//Log.d(TAG, "Adding " + me.substring(0,6) + " to " + room);
		                		mmSubscriptions.add(new RoomOccupancy(joinRoom, me));
		                		
		                		Map<String, ConnectedThread> participants;
		                		if (mSubscriptions.containsKey(joinRoom)) {
		                			// Joining existing session
		                			participants = mSubscriptions.get(joinRoom);
		                			isCreator = false;
		                		} else {
		                			// New session
		                			isCreator = true;
		                			participants = new HashMap<String, ConnectedThread>();
		                			mSubscriptions.put(joinRoom, participants);
		                			
		                			JSONObject script = sys.optJSONObject("script");
		                			if (script != null) {
			                			mActivityScripts.put(joinRoom, script);
		                			}
		                		}
		                		
		                		participants.put(me, this);
		                		
		                		// Response
		                		JSONObject script = null;
                				JSONObject joinedObj = new JSONObject();
        	                    JSONObject joinedMsg = new JSONObject();
        	                    try {
        		                    joinedObj.put(Junction.JX_SYS_MSG, true);
        		                    joinedObj.put(Junction.JX_JOINED, true);
        		                    joinedObj.put(Junction.JX_CREATOR, isCreator);
        		                    if (isCreator) {
        		                    	script = mActivityScripts.get(joinRoom);
        		                    	if (script != null) {
        		                    		joinedObj.put(Junction.JX_SCRIPT, script);
        		                    	}
        		                    }
        		                    joinedMsg.put(Junction.NS_JX, joinedObj);
        		                    mmJsonHelper.sendJson(joinedMsg);
        	                    } catch (Exception e) {
        	                    	Log.e(TAG, "Error sending join response",e);
        	                    }
	                		}
	                	}
	                	
	                	// Send message to session
	                	String action = sys.optString("action");
	                	if ("send_s".equals(action)) {
	                		String session = sys.getString("session");
	                		RoomId room = getRoomId(session);
	                		jx.remove(Junction.JX_SYS_MSG);
	                		
	                		synchronized(room) {
	                			Map<String, ConnectedThread> peers = mSubscriptions.get(room);
	                			for (String u : peers.keySet()) {
	                				ConnectedThread conn = peers.get(u);
	                				conn.sendJson(json);
	                			}
	                		}
	                	}
	                	
	                	if ("send_a".equals(action)) {
	                		String session = sys.getString("session");
	                		RoomId room = getRoomId(session);
	                		String actor = sys.getString("actor");
	                		jx.remove(Junction.JX_SYS_MSG);
	                		
	                		synchronized(room) {
	                			ConnectedThread conn = mSubscriptions.get(room).get(actor);
	                			if (conn != null) {
	                				conn.sendJson(json);
	                			}
	                		}
	                	}
	            	}
	        	}
        	} catch (JSONException e) {
        		Log.e(TAG, "Error building json object", e);
        	}
        }

        public void sendJson(JSONObject json) {
        	try {
        		mmJsonHelper.sendJson(json);
        	} catch (Exception e) {
        		Log.e(TAG, "Error writing JSON", e);
        	}
        }
        
        /**
         * Write to the connected OutStream.
         * @param buffer  The bytes to write
         */
        public void write(byte[] buffer, int bytes) {
            try {
                mmOutStream.write(buffer, 0, bytes);
            } catch (IOException e) {
                Log.e(TAG, "Exception during write", e);
            }
        }
        
        public void write(String buffer) {
        	try {
        		byte[] b = buffer.getBytes();
                mmOutStream.write(b, 0, b.length);
            } catch (IOException e) {
                Log.e(TAG, "Exception during write", e);
            }
        }

        public void cancel() {
            try {
            	//synchronized(JXServer.this) {
	            	for (RoomOccupancy entry : mmSubscriptions) {
	            		synchronized(entry.room) {
		            		Map<String, ConnectedThread> users = mSubscriptions.get(entry.room);
		            		users.remove(entry.id);
		            		if (users.size() == 0) {
		            			mSubscriptions.remove(entry.room);
		            		}
	            		}
	            	}
	            	mmSubscriptions.clear();
	            	mmSubscriptions = null;
            	//}
                mmSocket.close();
            } catch (IOException e) {
                Log.e(TAG, "close() of connect socket failed", e);
            }
        }
    }
	
    public class RoomOccupancy {
    	public RoomId room;
    	public String id;
    	
    	public RoomOccupancy(RoomId r, String me) {
    		room = r;
    		id = me;
    	}
    }
    
    /**
     * Use a wrapper class so we can better trust locks
     */
    public class RoomId {
    	public String name;
    	
    	private RoomId(String name) {
    		this.name = name;
    	}
    }
    
    Map<String,RoomId> mRoomMap = new HashMap<String,RoomId>();
    public RoomId getRoomId(String name) {
    	if (!mRoomMap.containsKey(name)) {
    		mRoomMap.put(name, new RoomId(name));
    	}
    	return mRoomMap.get(name);
    }
    
	public static class Log {
		public static void d(String tag, String msg) {
			System.out.println(tag + ": " + msg);
		}
		
		public static void e(String tag, String msg) {
			System.err.println(tag + ": " + msg);
		}
		
		public static void e(String tag, String msg, Exception e) {
			System.err.println(tag + ": " + msg);
			e.printStackTrace(System.err);
		}
	}
}