package com.it18zhang.server;

import java.io.IOException;

import java.net.Socket;

import com.it18zhang.common.Message;

import com.it18zhang.util.SocketUtil;

/**

 * 服务器消息推送器

 */

public class MessagePusher {

/**

 * 推送消息 

 */

public static void push(final Message msg){

//遍历所有socket

for(final Socket s : MessageServer.allSockets){

new Thread(){

public void run() {

try {

SocketUtil.writeMessage(s.getOutputStream(), msg);

}

catch (IOException e) {

e.printStackTrace();

}

}

}.start();

}

}

}

package com.it18zhang.server;

import java.net.InetAddress;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.ArrayList;

import java.util.List;

import com.it18zhang.util.SocketUtil;

/**

 * 

 */

public class MessageServer {

//客户端集合

public static List<String> friends  = new ArrayList<String>();

//存放所有socket

public static List<Socket> allSockets = new ArrayList<Socket>();

//启动

public void start(int port){

try {

ServerSocket ss = new ServerSocket(port);

System.out.println("服务器启动了!!");

while(true){

Socket sock = ss.accept();

//添加好友

friends.add(SocketUtil.getAddr(sock));

//维护socket集合

allSockets.add(sock);

MessagePusher.push(new LatestFriendsServerMessage(friends));

//分配线程,单独处理

new ServerReceiverThread(sock).start();

}

}

catch (Exception e) {

}

}

}

package com.it18zhang.server;

import java.io.IOException;

import java.io.InputStream;

import java.net.Socket;

import com.it18zhang.common.Message;

import com.it18zhang.common.MessageFactory;

import com.it18zhang.util.SocketUtil;

import sun.awt.windows.ThemeReader;

/**

 * 服务器接收线程.

 */

public class ServerReceiverThread extends Thread {

private Socket sock ;

private InputStream is ;

public ServerReceiverThread(Socket sock) {

try {

this.sock = sock ;

this.is = sock.getInputStream() ;

}

catch (Exception e) {

e.printStackTrace();

}

}

public void run() {

while(true){

//

try {

//转成服务器消息

Message msg = MessageFactory.tranformPack(is);

//

int type = msg.getType();

switch(type){

case Message.MESSAGE_TYPE_SERVER_TALK : 

MessagePusher.push(msg);

break ;

case Message.MESSAGE_TYPE_SERVER_REFRESHFRIENDS:

MessagePusher.push(msg);

break ;

}

}

catch (Exception e) {

e.printStackTrace();

}

}

}

}

package com.it18zhang.server;

public class StartServer {

public static void main(String[] args) {

//消息服务器

MessageServer server = new MessageServer();

//启动消息服务器

server.start(1234);

}

}

package com.it18zhang.server;

import com.it18zhang.common.Message;

/**

 * 服务器端推送给客户端的talk消息

 */

public class TalkServerMessage extends Message {

public TalkServerMessage(String text){

//处理消息

this.setType(Message.MESSAGE_TYPE_SERVER_TALK) ;

byte[] bytes = text.getBytes() ;

this.setContent(bytes);

}

/**

 * 直接通过原生报文构造消息对象

 */

public TalkServerMessage(byte[] pack){

this.setPack(pack);

pack[0] = Message.MESSAGE_TYPE_SERVER_TALK ;

}

/**

 * 直接通过原生报文构造消息对象

 */

public TalkServerMessage(int length,byte[] content){

this.setType(Message.MESSAGE_TYPE_SERVER_TALK) ;

this.setContent(content);

}

public byte[] genMessagePack() {

if(getPack() != null){

return getPack();

}

else{

return super.genMessagePack();

}

}

public Object getData() {

return new String(getContent());

}

}

package com.it18zhang.util;

import java.io.ByteArrayInputStream;

import java.io.IOException;

import java.io.InputStream;

import java.io.ObjectInputStream;

import java.io.OutputStream;

import java.io.UnsupportedEncodingException;

import java.net.Socket;

import java.util.List;

import com.it18zhang.common.Message;

/**

 * 工具类

 */

public class SocketUtil {

/**

 * 返回地址串

 */

public static String getAddr(Socket sock){

String ip = sock.getInetAddress().getHostName() ;

String ip0 = ip.substring(ip.lastIndexOf(".") + 1);

return ip0 + ":" + sock.getPort() ;

}

/**

 * 从输入流is的当前读取byte,处理成消息类型.

 * 0-好友列表  1-文本内容

 */

public static int readMsgType(InputStream is){

try {

return is.read() ;

}

catch (IOException e) {

e.printStackTrace();

}

return -1 ;

}

/**

 * 读取消息的长度

 */

public static int readMsglength(InputStream is){

try {

byte[] buf = new byte[4];

is.read(buf);

//最高位

int s3 =  buf[0] << 24 ;

//次高位

int s2 =  (buf[1] & 0xFF) << 16  ;

//次低位

int s1 =  (buf[2] & 0xFF) << 8 ;

//最低位

int s0 =  buf[3] & 0xFF ;

return s3 | s2 | s1 | s0 ;

}

catch (Exception e) {

e.printStackTrace();

}

return -1 ;

}

/**

 * 读取指定消息长度的内容

 */

public static byte[] readMessage(InputStream is,int len){

try {

byte[] bytes = new byte[len];

is.read(bytes);

return bytes ;

}

catch (IOException e) {

e.printStackTrace();

}

return null ;

}

/**

 * 反序列化字节数组成好友列表

 */

public static List<String> deserializeFriends(byte[] bytes){

try {

ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));

List<String> list= (List<String>)ois.readObject();

ois.close();

return list ;

}

catch (Exception e) {

e.printStackTrace();

}

return null ;

}

/**

 * 读取字符串

 */

public static String readString(byte[] bytes){

return new String(bytes);

}

/**

 * 使用指定的字符集读取字符串

 */

public static String readString(byte[] bytes,String charset){

try {

return new String(bytes,charset);

}

catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

return null ;

}

/**

 * 发送消息到os中

 */

public static void writeMessage(OutputStream os , String txt){

try {

byte[] bytes = txt.getBytes();

//先写入字节数组的长度

os.write(int2ByteArr(bytes.length));

//再写入字节

os.write(bytes);

}

catch (Exception e) {

e.printStackTrace();

}

}

/**

 * 发送Message的原始报文到os中

 */

public static void writeMessage(OutputStream os , Message msg){

try {

os.write(msg.genMessagePack());

os.flush();

}

catch (Exception e) {

e.printStackTrace();

}

}

/**

 * 整型转换成字节数组

 */

public static byte[] int2ByteArr(int i){

byte[] bytes = new byte[4] ;

bytes[0] = (byte)(i >> 24) ;

bytes[1] = (byte)(i >> 16) ;

bytes[2] = (byte)(i >> 8) ;

bytes[3] = (byte)(i >> 0) ;

return bytes ;

}

}