package edu.stanford.junction.provider.jx;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.json.JSONException;
import org.json.JSONObject;
import edu.stanford.junction.JunctionException;
import edu.stanford.junction.JunctionMaker;
import edu.stanford.junction.SwitchboardConfig;
import edu.stanford.junction.api.activity.ActivityScript;
import edu.stanford.junction.api.activity.JunctionActor;
import edu.stanford.junction.api.messaging.MessageHeader;
import edu.stanford.junction.provider.jx.json.JsonHandler;
import edu.stanford.junction.provider.jx.json.JsonSocketHandler;
import edu.stanford.junction.provider.jx.json.JsonWebSocketHandler;
public class JXServer {
public static final int SERVER_PORT = 8283;
private static final String TAG = "jx_server";
private static final int BUFFER_LENGTH = 2048;
private Map<RoomId, JSONObject> mActivityScripts;
private Set<ConnectedThread> mConnections;
private Map<RoomId, Map<String, ConnectedThread>> mSubscriptions;
private AcceptThread mAcceptThread;
public static void main(String[] argv) {
final String TAG = "test";
JXServer server = new JXServer();
Log.d(TAG, "Starting server.");
server.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
final TestActor a1 = server.new TestActor("a1");
final TestActor a2 = server.new TestActor("a2");
final TestActor a3 = server.new TestActor("a3");
a3.buddy = a2;
final URI uri = URI.create("junction://localhost/jxserver#jx");
final SwitchboardConfig cfg = JunctionMaker.getDefaultSwitchboardConfig(uri);
final ActivityScript script = new ActivityScript();
script.setFriendlyName("Test Session");
script.setActivityID("org.openjunction.test");
boolean TEST_CLIENTS = true;
if (TEST_CLIENTS) {
try {
Log.d(TAG, "Attempting to join session");
JunctionMaker.getInstance(cfg).newJunction(uri, script, a1);
JunctionMaker.getInstance(cfg).newJunction(uri, script, a2);
JunctionMaker.getInstance(cfg).newJunction(uri, script, a3);
} catch (JunctionException e) {
Log.e(TAG, "error joining juction", e);
}
}
}
class TestActor extends JunctionActor {
JunctionActor buddy;
final String name;
public TestActor(String name) {
super("test");
this.name = name;
}
@Override
public void onMessageReceived(MessageHeader header, JSONObject message) {
Log.d(TAG, name + " got: " + message.toString() + " !" + " from " + header.from);
}
@Override
public void onActivityJoin() {
super.onActivityJoin();
Log.d(TAG, name + " joined session!");
try {
new Thread() {
boolean loop = false;
JSONObject hello = new JSONObject("{\"msg\":\"hello world! from: " + name + "\"}");
public void run() {
do {
sendMessageToSession(hello);
Log.d(TAG, name + " sent a message.");
try {
Thread.sleep(5000);
} catch (Exception e) {}
} while (loop);
};
}.start();
if ("a3".equals(name)) {
sendMessageToActor(buddy.getActorID(), new JSONObject("{\"psst\":\"hi\"}"));
Log.d(TAG, name + " sent a secret message to " );
}
} catch (JSONException e) {
e.printStackTrace();
}
}
@Override
public void onActivityCreate() {
Log.d(TAG, "CREATED the session!!!");
}
};
public JXServer() {
}
/**
* Starts a simple chat server, allowing users to
* connect to an arbitrary chat room.
*/
public void start() {
mConnections = new HashSet<ConnectedThread>();
mSubscriptions = new ConcurrentHashMap<RoomId, Map<String, ConnectedThread>>();
mActivityScripts = new ConcurrentHashMap<RoomId, JSONObject>();
mAcceptThread = new AcceptThread();
mAcceptThread.start();
}
public void stop() {
mAcceptThread.cancel();
mAcceptThread = null;
mConnections = null;
mSubscriptions.clear();
mSubscriptions = null;
mActivityScripts.clear();
mActivityScripts = null;
}
private class AcceptThread extends Thread {
// The local server socket
private final ServerSocket mmServerSocket;
public AcceptThread() {
ServerSocket tmp = null;
// Create a new listening server socket
try {
tmp = new ServerSocket(SERVER_PORT);
} catch (IOException e) {
System.err.println("Could not open server socket");
e.printStackTrace(System.err);
}
mmServerSocket = tmp;
}
public void run() {
//Log.d(TAG, "BEGIN mAcceptThread" + this);
setName("AcceptThread");
Socket socket = null;
// Listen to the server socket always
while (true) {
try {
// This is a blocking call and will only return on a
// successful connection or an exception
Log.d(TAG, "waiting for client...");
socket = mmServerSocket.accept();
Log.d(TAG, "Client connected!");
} catch (SocketException e) {
} catch (IOException e) {
Log.e(TAG, "accept() failed", e);
break;
}
// If a connection was accepted
if (socket == null) {
break;
}
//synchronized (JXServer.this) {
ConnectedThread conThread = new ConnectedThread(socket);
conThread.start();
mConnections.add(conThread);
//}
}
Log.d(TAG, "END mAcceptThread");
}
public void cancel() {
Log.d(TAG, "cancel " + this);
try {
mmServerSocket.close();
} catch (IOException e) {
Log.e(TAG, "close() of server failed", e);
}
for (ConnectedThread conn : mConnections) {
conn.cancel();
}
mConnections.clear();
}
}
/**
* This thread runs during a connection with a remote device.
* It handles all incoming and outgoing transmissions.
*/
private class ConnectedThread extends Thread {
private final Socket mmSocket;
private final InputStream mmInStream;
private final OutputStream mmOutStream;
private Set<RoomOccupancy> mmSubscriptions;
private JsonHandler mmJsonHelper;
public ConnectedThread(Socket socket) {
Log.d(TAG, "create ConnectedThread");
mmSubscriptions = new HashSet<RoomOccupancy>();
mmSocket = socket;
InputStream tmpIn = null;
OutputStream tmpOut = null;
try {
tmpIn = socket.getInputStream();
tmpOut = socket.getOutputStream();
} catch (IOException e) {
Log.e(TAG, "temp sockets not created", e);
}
mmInStream = tmpIn;
mmOutStream = tmpOut;
}
public void run() {
Log.d(TAG, "BEGIN mConnectedThread");
byte[] buffer = new byte[BUFFER_LENGTH];
int bytes;
// Read header information, determine connection type
try {
bytes = mmInStream.read(buffer);
String header = new String(buffer, 0, bytes);
// determine request type
if (header.startsWith("GET ")) {
Log.d(TAG, "Found HTTP GET request");
doHttpConnection(header);
} else if (header.startsWith("JUNCTION")) {
Log.d(TAG, "Found Junction connection");
mmJsonHelper = new JsonSocketHandler(mmInStream, mmOutStream);
doJunctionConnection();
}
} catch (IOException e) {
Log.e(TAG, "Error reading connection header", e);
}
// No longer listening.
cancel();
}
private void doHttpConnection(String header) {
Log.d(TAG, "HTTP header:\n" + header);
String[] lines = header.split("\r\n");
boolean isWebSocket = false;
String origin = null;
String host = null;
String webSocketKey1 = null;
String webSocketKey2 = null;
for (String l : lines) {
if (l.startsWith("Upgrade: WebSocket")) {
isWebSocket = true;
}
if (l.startsWith("Sec-WebSocket-Key1: ")) {
webSocketKey1 = l.substring(20);
}
if (l.startsWith("Sec-WebSocket-Key2: ")) {
webSocketKey2 = l.substring(20);
}
if (l.startsWith("Origin: ")) {
origin = l.substring(8);
}
if (l.startsWith("Host: ")) {
host = l.substring(6);
}
}
if (!isWebSocket) {
Log.e(TAG, "Not a websocket request");
return;
}
long v1 = getKeyNumber(webSocketKey1);
long v2 = getKeyNumber(webSocketKey2);
int s1 = countSpaces(webSocketKey1);
int s2 = countSpaces(webSocketKey2);
if (v1 % s1 != 0 || v2 % s2 != 0) {
Log.e(TAG, "WebSocket failed handshake");
}
long p1 = v1 / s1;
long p2 = v2 / s2;
String p3 = header.substring(header.length()-8);
byte[] response = webSocketResponse(p1, p2, p3);
String endpoint = "ws://" + host + "/"; // TODO
this.write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n");
this.write("Upgrade: WebSocket\r\n");
this.write("Connection: Upgrade\r\n");
this.write("Sec-WebSocket-Origin: " + origin + "\r\n");
this.write("Sec-WebSocket-Location: " + endpoint + "\r\n");
this.write("\r\n");
this.write(response, response.length);
try {
mmOutStream.flush();
} catch (IOException e) {
Log.e(TAG, "Error completing handshake", e);
return;
}
// TODO: support length prefixed data?
mmJsonHelper = new JsonWebSocketHandler(mmInStream, mmOutStream);
doJunctionConnection();
}
private byte[] webSocketResponse(long p1, long p2, String p3) {
try {
byte[] challenge = new byte[16];
challenge[0] = (byte)( p1 >>> 24 );
challenge[1] = (byte)( (p1 << 8) >>> 24 );
challenge[2] = (byte)( (p1 << 16) >>> 24 );
challenge[3] = (byte)( (p1 << 24) >>> 24 );
challenge[4] = (byte)( p2 >>> 24 );
challenge[5] = (byte)( (p2 << 8) >>> 24 );
challenge[6] = (byte)( (p2 << 16) >>> 24 );
challenge[7] = (byte)( (p2 << 24) >>> 24 );
System.arraycopy(p3.getBytes(), 0, challenge, 8, 8);
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] resp = md.digest(challenge);
return resp;
} catch (Exception e) {
Log.e(TAG, "Error computing response to websocket challenge", e);
return null;
}
}
private long getKeyNumber(String key) {
long n = 0;
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);
if ('0' <= c && c <= '9') {
n = 10*n + (c-'0');
}
}
return n;
}
private int countSpaces(String key) {
int n = 0;
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);
if (c == ' ') n++;
}
return n;
}
private void doJunctionConnection() {
// Keep listening to the InputStream while connected
while (true) {
try {
JSONObject json = mmJsonHelper.jsonFromStream();
if (json == null) {
break;
}
handleJson(json);
} catch (IOException e) {
Log.e(TAG, "disconnected", e);
//connectionLost();
break;
}
}
}
private void handleJson(JSONObject json) {
try {
Log.d(TAG, json.toString());
if (json.has(Junction.NS_JX)) {
JSONObject jx = json.getJSONObject(Junction.NS_JX);
if (jx.has(Junction.JX_SYS_MSG)) {
JSONObject sys = jx.getJSONObject(Junction.JX_SYS_MSG);
// Join
if (sys.has("join")) {
boolean isCreator = false;
String roomName = sys.getString("join");
RoomId joinRoom = getRoomId(roomName);
String me = sys.getString("id");
synchronized (joinRoom) {
//Log.d(TAG, "Adding " + me.substring(0,6) + " to " + room);
mmSubscriptions.add(new RoomOccupancy(joinRoom, me));
Map<String, ConnectedThread> participants;
if (mSubscriptions.containsKey(joinRoom)) {
// Joining existing session
participants = mSubscriptions.get(joinRoom);
isCreator = false;
} else {
// New session
isCreator = true;
participants = new HashMap<String, ConnectedThread>();
mSubscriptions.put(joinRoom, participants);
JSONObject script = sys.optJSONObject("script");
if (script != null) {
mActivityScripts.put(joinRoom, script);
}
}
participants.put(me, this);
// Response
JSONObject script = null;
JSONObject joinedObj = new JSONObject();
JSONObject joinedMsg = new JSONObject();
try {
joinedObj.put(Junction.JX_SYS_MSG, true);
joinedObj.put(Junction.JX_JOINED, true);
joinedObj.put(Junction.JX_CREATOR, isCreator);
if (isCreator) {
script = mActivityScripts.get(joinRoom);
if (script != null) {
joinedObj.put(Junction.JX_SCRIPT, script);
}
}
joinedMsg.put(Junction.NS_JX, joinedObj);
mmJsonHelper.sendJson(joinedMsg);
} catch (Exception e) {
Log.e(TAG, "Error sending join response",e);
}
}
}
// Send message to session
String action = sys.optString("action");
if ("send_s".equals(action)) {
String session = sys.getString("session");
RoomId room = getRoomId(session);
jx.remove(Junction.JX_SYS_MSG);
synchronized(room) {
Map<String, ConnectedThread> peers = mSubscriptions.get(room);
for (String u : peers.keySet()) {
ConnectedThread conn = peers.get(u);
conn.sendJson(json);
}
}
}
if ("send_a".equals(action)) {
String session = sys.getString("session");
RoomId room = getRoomId(session);
String actor = sys.getString("actor");
jx.remove(Junction.JX_SYS_MSG);
synchronized(room) {
ConnectedThread conn = mSubscriptions.get(room).get(actor);
if (conn != null) {
conn.sendJson(json);
}
}
}
}
}
} catch (JSONException e) {
Log.e(TAG, "Error building json object", e);
}
}
public void sendJson(JSONObject json) {
try {
mmJsonHelper.sendJson(json);
} catch (Exception e) {
Log.e(TAG, "Error writing JSON", e);
}
}
/**
* Write to the connected OutStream.
* @param buffer The bytes to write
*/
public void write(byte[] buffer, int bytes) {
try {
mmOutStream.write(buffer, 0, bytes);
} catch (IOException e) {
Log.e(TAG, "Exception during write", e);
}
}
public void write(String buffer) {
try {
byte[] b = buffer.getBytes();
mmOutStream.write(b, 0, b.length);
} catch (IOException e) {
Log.e(TAG, "Exception during write", e);
}
}
public void cancel() {
try {
//synchronized(JXServer.this) {
for (RoomOccupancy entry : mmSubscriptions) {
synchronized(entry.room) {
Map<String, ConnectedThread> users = mSubscriptions.get(entry.room);
users.remove(entry.id);
if (users.size() == 0) {
mSubscriptions.remove(entry.room);
}
}
}
mmSubscriptions.clear();
mmSubscriptions = null;
//}
mmSocket.close();
} catch (IOException e) {
Log.e(TAG, "close() of connect socket failed", e);
}
}
}
public class RoomOccupancy {
public RoomId room;
public String id;
public RoomOccupancy(RoomId r, String me) {
room = r;
id = me;
}
}
/**
* Use a wrapper class so we can better trust locks
*/
public class RoomId {
public String name;
private RoomId(String name) {
this.name = name;
}
}
Map<String,RoomId> mRoomMap = new HashMap<String,RoomId>();
public RoomId getRoomId(String name) {
if (!mRoomMap.containsKey(name)) {
mRoomMap.put(name, new RoomId(name));
}
return mRoomMap.get(name);
}
public static class Log {
public static void d(String tag, String msg) {
System.out.println(tag + ": " + msg);
}
public static void e(String tag, String msg) {
System.err.println(tag + ": " + msg);
}
public static void e(String tag, String msg, Exception e) {
System.err.println(tag + ": " + msg);
e.printStackTrace(System.err);
}
}
}