package com.it18zhang.server;
import java.io.IOException;
import java.net.Socket;
import com.it18zhang.common.Message;
import com.it18zhang.util.SocketUtil;
/**
* 服务器消息推送器
*/
public class MessagePusher {
/**
* 推送消息
*/
public static void push(final Message msg){
//遍历所有socket
for(final Socket s : MessageServer.allSockets){
new Thread(){
public void run() {
try {
SocketUtil.writeMessage(s.getOutputStream(), msg);
}
catch (IOException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
package com.it18zhang.server;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import com.it18zhang.util.SocketUtil;
/**
*
*/
public class MessageServer {
//客户端集合
public static List<String> friends = new ArrayList<String>();
//存放所有socket
public static List<Socket> allSockets = new ArrayList<Socket>();
//启动
public void start(int port){
try {
ServerSocket ss = new ServerSocket(port);
System.out.println("服务器启动了!!");
while(true){
Socket sock = ss.accept();
//添加好友
friends.add(SocketUtil.getAddr(sock));
//维护socket集合
allSockets.add(sock);
MessagePusher.push(new LatestFriendsServerMessage(friends));
//分配线程,单独处理
new ServerReceiverThread(sock).start();
}
}
catch (Exception e) {
}
}
}
package com.it18zhang.server;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import com.it18zhang.common.Message;
import com.it18zhang.common.MessageFactory;
import com.it18zhang.util.SocketUtil;
import sun.awt.windows.ThemeReader;
/**
* 服务器接收线程.
*/
public class ServerReceiverThread extends Thread {
private Socket sock ;
private InputStream is ;
public ServerReceiverThread(Socket sock) {
try {
this.sock = sock ;
this.is = sock.getInputStream() ;
}
catch (Exception e) {
e.printStackTrace();
}
}
public void run() {
while(true){
//
try {
//转成服务器消息
Message msg = MessageFactory.tranformPack(is);
//
int type = msg.getType();
switch(type){
case Message.MESSAGE_TYPE_SERVER_TALK :
MessagePusher.push(msg);
break ;
case Message.MESSAGE_TYPE_SERVER_REFRESHFRIENDS:
MessagePusher.push(msg);
break ;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.it18zhang.server;
public class StartServer {
public static void main(String[] args) {
//消息服务器
MessageServer server = new MessageServer();
//启动消息服务器
server.start(1234);
}
}
package com.it18zhang.server;
import com.it18zhang.common.Message;
/**
* 服务器端推送给客户端的talk消息
*/
public class TalkServerMessage extends Message {
public TalkServerMessage(String text){
//处理消息
this.setType(Message.MESSAGE_TYPE_SERVER_TALK) ;
byte[] bytes = text.getBytes() ;
this.setContent(bytes);
}
/**
* 直接通过原生报文构造消息对象
*/
public TalkServerMessage(byte[] pack){
this.setPack(pack);
pack[0] = Message.MESSAGE_TYPE_SERVER_TALK ;
}
/**
* 直接通过原生报文构造消息对象
*/
public TalkServerMessage(int length,byte[] content){
this.setType(Message.MESSAGE_TYPE_SERVER_TALK) ;
this.setContent(content);
}
public byte[] genMessagePack() {
if(getPack() != null){
return getPack();
}
else{
return super.genMessagePack();
}
}
public Object getData() {
return new String(getContent());
}
}
package com.it18zhang.util;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;
import java.util.List;
import com.it18zhang.common.Message;
/**
* 工具类
*/
public class SocketUtil {
/**
* 返回地址串
*/
public static String getAddr(Socket sock){
String ip = sock.getInetAddress().getHostName() ;
String ip0 = ip.substring(ip.lastIndexOf(".") + 1);
return ip0 + ":" + sock.getPort() ;
}
/**
* 从输入流is的当前读取byte,处理成消息类型.
* 0-好友列表 1-文本内容
*/
public static int readMsgType(InputStream is){
try {
return is.read() ;
}
catch (IOException e) {
e.printStackTrace();
}
return -1 ;
}
/**
* 读取消息的长度
*/
public static int readMsglength(InputStream is){
try {
byte[] buf = new byte[4];
is.read(buf);
//最高位
int s3 = buf[0] << 24 ;
//次高位
int s2 = (buf[1] & 0xFF) << 16 ;
//次低位
int s1 = (buf[2] & 0xFF) << 8 ;
//最低位
int s0 = buf[3] & 0xFF ;
return s3 | s2 | s1 | s0 ;
}
catch (Exception e) {
e.printStackTrace();
}
return -1 ;
}
/**
* 读取指定消息长度的内容
*/
public static byte[] readMessage(InputStream is,int len){
try {
byte[] bytes = new byte[len];
is.read(bytes);
return bytes ;
}
catch (IOException e) {
e.printStackTrace();
}
return null ;
}
/**
* 反序列化字节数组成好友列表
*/
public static List<String> deserializeFriends(byte[] bytes){
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
List<String> list= (List<String>)ois.readObject();
ois.close();
return list ;
}
catch (Exception e) {
e.printStackTrace();
}
return null ;
}
/**
* 读取字符串
*/
public static String readString(byte[] bytes){
return new String(bytes);
}
/**
* 使用指定的字符集读取字符串
*/
public static String readString(byte[] bytes,String charset){
try {
return new String(bytes,charset);
}
catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null ;
}
/**
* 发送消息到os中
*/
public static void writeMessage(OutputStream os , String txt){
try {
byte[] bytes = txt.getBytes();
//先写入字节数组的长度
os.write(int2ByteArr(bytes.length));
//再写入字节
os.write(bytes);
}
catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送Message的原始报文到os中
*/
public static void writeMessage(OutputStream os , Message msg){
try {
os.write(msg.genMessagePack());
os.flush();
}
catch (Exception e) {
e.printStackTrace();
}
}
/**
* 整型转换成字节数组
*/
public static byte[] int2ByteArr(int i){
byte[] bytes = new byte[4] ;
bytes[0] = (byte)(i >> 24) ;
bytes[1] = (byte)(i >> 16) ;
bytes[2] = (byte)(i >> 8) ;
bytes[3] = (byte)(i >> 0) ;
return bytes ;
}
}