您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10352|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;/ `* u. @( g6 V
  2. require_once __DIR__ . '/Workerman/Autoloader.php';# L: y6 U0 @7 v0 |: ^- `2 T! o
  3. 3 G8 l) d. f& U7 L
  4. $worker = new Worker();
    ) K) r5 A$ d1 _" i2 z* H$ G
  5. // 4个进程
    ' _* {. M4 L0 }8 ^9 ]
  6. $worker->count = 4;& q: S0 h# c* ?; A
  7. // 每个进程启动后在当前进程新增一个Worker监听
    : p% c- ?9 O  M- n3 t1 S
  8. $worker->onWorkerStart = function($worker)
    . D4 ?- v/ \/ U- y
  9. {2 u0 W! y& i2 X' T
  10.     /**
    6 V$ ?7 R- X* v8 |
  11.      * 4个进程启动的时候都创建2016端口的Worker; u' t: _4 @; X. O0 A
  12.      * 当执行到worker->listen()时会报Address already in use错误
    8 y0 S( K/ v4 a
  13.      * 如果worker->count=1则不会报错$ `+ N$ d, q0 G3 f3 q2 ~  N2 Y- Y
  14.      */
    + |. o5 l4 J$ M+ O3 w8 s, G7 o
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');0 c0 @! P3 m* f
  16.     $inner_worker->onMessage = 'on_message';
    3 K! }$ [' z4 R6 `# R% R. d  o$ y
  17.     // 执行监听。这里会报Address already in use错误
    $ f& H) y1 m! K# T: i$ V7 h8 y
  18.     $inner_worker->listen();* Y" n6 |7 z7 l4 B6 J- G
  19. };
    , Y  N+ l. b0 Q% g7 ?+ A
  20.   I& z  k+ R' ?3 J7 m0 h( e$ C
  21. $worker->onMessage = 'on_message';  D9 n8 O3 {8 S, E1 Y+ F* z
  22. ( q8 J# `. `3 ?, R
  23. function on_message($connection, $data)" P( @: r' C5 G9 L8 s1 N
  24. {9 F: F0 G( B5 W  l% w. w; l( z7 v
  25.     $connection->send("hello\n");
    - v' g! ]) U# a) o
  26. }1 v. ?+ k+ ~2 S! I; Y/ k# A
  27. # f2 X1 h  {5 M2 |5 Z" _
  28. // 运行worker
    & z0 p* g& y) z2 @6 r
  29. Worker::runAll();
    $ q% l$ D+ a! h. b" w$ o1 l
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    $ M' _; K- p  x% ?2 E5 c

  31. 7 L( H: }; l6 r. @
  32. use Workerman\Worker;; b: T9 w* f: l7 j( X
  33. require_once './Workerman/Autoloader.php';9 @! _8 ]9 ^" M( \6 g

  34. ( f0 G# o. V3 h0 {+ P
  35. $worker = new Worker('text://0.0.0.0:2015');
    1 N/ A* Q: e& I" v* T7 u
  36. // 4个进程
    ! H1 Z% [6 B1 t" `
  37. $worker->count = 4;/ q" ~' ^) x! G/ B, C+ B
  38. // 每个进程启动后在当前进程新增一个Worker监听
    0 a! k  h: r$ f4 ^: K$ p
  39. $worker->onWorkerStart = function($worker)5 a9 Q- _2 ]2 p4 B" P# R+ h
  40. {' F" T0 G, m8 }$ H; v6 P5 X
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');6 x& z, K( N! }9 p* u  w0 t
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    * E! c1 d: J- q/ I7 h- L( x) p) W
  43.     $inner_worker->reusePort = true;6 ^7 A7 R) s! @8 a9 U; ~
  44.     $inner_worker->onMessage = 'on_message';
    3 S1 K/ g; T- w& B4 x6 c, F0 i: f
  45.     // 执行监听。正常监听不会报错
    ' ^# N/ p+ g; n; N' B
  46.     $inner_worker->listen();
    7 t. `& d3 a$ c; }5 `  }2 E" M
  47. };0 ?. \/ `4 J  S- Y& u3 o! z

  48. 7 d1 x3 Q1 B* r; T  Q: H
  49. $worker->onMessage = 'on_message';
    + e& i. H5 B& ^1 |3 Y9 g3 Y- ]
  50. 2 S$ d( U- V0 y* O
  51. function on_message($connection, $data)* Z& ?0 n  {! n% A+ Q8 F
  52. {
    # j( H5 A# H5 R' Z& `5 M
  53.     $connection->send("hello\n");! x) ~, L0 T/ P& e0 ~9 ^
  54. }! ^6 }* L9 z, Q3 h; P3 M- q
  55. ; V/ q1 _  F* ~- V
  56. // 运行worker
    # a; l) j- N7 D7 k' x" ?
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php. S/ X5 \- O0 i  `% o% G8 A7 m7 f
  2. use Workerman\Worker;/ k1 F$ X# w5 b+ a
  3. require_once './Workerman/Autoloader.php';4 R. W% q& [$ t& T! n' d
  4. // 初始化一个worker容器,监听1234端口
    9 Y% w- P1 W7 D0 z
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    + z, F. P6 Q2 u* u+ ^7 W

  6. ' c* `8 O: z! s: s
  7. /*
    # ]4 z# F7 P( h2 y
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误8 b' m1 d9 P$ c7 I! ~/ T
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    9 B. v8 H  `7 O0 C3 O
  10. */
    2 t9 ?4 n) h4 t  v
  11. $worker->count = 1;5 S; r" }- i4 O; g
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    9 ?: X  W1 f+ W3 q/ q% H
  13. $worker->onWorkerStart = function($worker)
    6 C6 j8 {& @9 @2 B* \3 t7 S& V
  14. {$ C  @+ V8 G  O9 }
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符+ i; d( z$ m' U& W
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    9 l5 C  k$ z$ U
  17.     $inner_text_worker->onMessage = function($connection, $buffer)( q9 r* f* R% p$ A
  18.     {1 O; l/ J% x; Q0 e# v$ h
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    $ r5 U# F/ n3 b' h8 N2 k4 m
  20.         $data = json_decode($buffer, true);
    - C2 X- d: p1 ~! j( f+ T
  21.         $uid = $data['uid'];+ L1 P% y$ }' n: r; K
  22.         // 通过workerman,向uid的页面推送数据2 A5 X2 y, v0 R$ P( j# Z
  23.         $ret = sendMessageByUid($uid, $buffer);
    ' G% L) J. s0 s# |& B* M
  24.         // 返回推送结果* _& e4 j! r1 m/ ^  m# W
  25.         $connection->send($ret ? 'ok' : 'fail');
    5 u6 M- [) l  R9 ^2 V# g
  26.     };( H7 ?' N$ S! A* l5 W+ X5 w# |) P
  27.     // ## 执行监听 ##
    % ^0 j6 W# n  X5 `7 z
  28.     $inner_text_worker->listen();
    ' ~& [9 r/ h; g! G9 P
  29. };4 g: k! J, S# ]8 g. s0 o. e
  30. // 新增加一个属性,用来保存uid到connection的映射
    5 y8 f3 O8 b0 A8 T: f1 f& q
  31. $worker->uidConnections = array();
    % z3 Y! B  R( `
  32. // 当有客户端发来消息时执行的回调函数
    8 w. s$ a  [- E  P7 j% h6 L
  33. $worker->onMessage = function($connection, $data)
    * F9 ^& S  Q6 k8 j; p1 m& g
  34. {
    . }  `( C+ S9 m: \0 t& I( h
  35.     global $worker;& h5 Y/ d  {; r2 h: z. F( r
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    # n) V" H9 M3 K! S8 o# W1 s" a
  37.     if(!isset($connection->uid)). _. L/ N% F# `5 u6 o9 K3 {
  38.     {
    ) L, z5 F; s& F' u5 Z/ |0 V# Z% N
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    4 R+ ~0 z9 J& J
  40.        $connection->uid = $data;
    - k$ ]- O& M; ?7 ?3 s* v' r
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,4 a4 J0 k; k8 C- Y$ o- e/ \  r
  42.         * 实现针对特定uid推送数据: s" l2 d9 x1 F
  43.         */
    2 l( B) [( C* y, H1 d5 R9 O7 N
  44.        $worker->uidConnections[$connection->uid] = $connection;% e8 g- F2 N' ~2 L/ Z! Y; I% C
  45.        return;/ A1 ^* l+ o( X5 O
  46.     }$ @. r' L6 _$ o0 v( [9 m' H& w
  47. };
    ) k2 e- G5 N4 Y$ R

  48. 3 u2 E  s1 a, ~3 E
  49. // 当有客户端连接断开时0 w1 V5 D1 D  P& s; b$ e3 X
  50. $worker->onClose = function($connection); o2 f1 c4 ~6 a5 a
  51. {9 U) _: g+ Q, |( O
  52.     global $worker;
    ) w$ _; i9 y0 B4 D( H, h7 p; h
  53.     if(isset($connection->uid))
    $ F/ {& t* J5 |+ V# K( e
  54.     {' c5 ]8 i' \* h; v8 ?) u
  55.         // 连接断开时删除映射4 z6 C+ R) Y, X4 N6 T% L
  56.         unset($worker->uidConnections[$connection->uid]);
    0 ~" Z' N9 b. ?4 e8 `1 x1 `9 _
  57.     }
    + n+ L3 S( O  q) @. S& H( e
  58. };4 N! f2 Q; [3 I, P- I- H0 J' r
  59. " D' t/ s8 S' h# V
  60. // 向所有验证的用户推送数据
      A+ f2 r) s& E* D# m7 b2 c2 v# \
  61. function broadcast($message)
    0 r; U: O; ]+ R) D7 m6 P
  62. {3 I$ w! E$ _# I; X3 m
  63.    global $worker;
    2 w* X7 i# o+ _% M
  64.    foreach($worker->uidConnections as $connection)
    9 r, D9 k) u* v  t+ p) m6 @
  65.    {1 G7 J0 f$ n) p+ l
  66.         $connection->send($message);
    ( M9 P* L! e1 Y  m/ {6 T# R8 F
  67.    }
    # ^1 B  N, Z5 d
  68. }
    & `" w1 m3 }0 R
  69. & R1 {- X+ |& M: W* k' U, ?: D/ d& B
  70. // 针对uid推送数据
    6 w% {& {4 [; I% ^5 X- i9 C
  71. function sendMessageByUid($uid, $message)
      y) {7 B, ?: o$ N
  72. {
    # W3 P* n0 B9 J; ^2 m, l7 v
  73.     global $worker;
    ! r) M) T3 M2 t) E) E& S
  74.     if(isset($worker->uidConnections[$uid]))
    - z' Z. o* {  ^# O7 z3 @6 E
  75.     {" e# z" E6 h& B
  76.         $connection = $worker->uidConnections[$uid];
    / X7 s! J' r0 t# Y9 t
  77.         $connection->send($message);
    4 m) u, M% Y) j, R7 w
  78.         return true;
    1 X' |5 I$ K0 ^5 |" {, v6 q
  79.     }
    0 w: c+ b$ b7 j1 h, Z6 R1 N) K* |" G
  80.     return false;  f2 c/ l# v3 H5 Z% e# D. H" W( [
  81. }: K* A0 X7 y/ c8 t/ F

  82. 3 a4 I. K* k9 w4 {6 u
  83. // 运行所有的worker/ Q* y0 m- v& [" {" f/ C
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');) H; c6 l1 _5 M/ B% U8 I
  2. ws.onopen = function(){5 N0 }) i- S$ ]
  3.     var uid = 'uid1';
    , X# Z  a$ ]7 Q( i: B
  4.     ws.send(uid);
    / a4 M: i, F( E3 F# N7 ?* j
  5. };8 G. s+ W( H; |
  6. ws.onmessage = function(e){' l# `# M. _$ q, ^- D7 X
  7.     alert(e.data);
    . D; c4 \( H: S/ C; a# N
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    1 X7 z" ^; M' g. v9 ]1 E
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);! ~* v2 c4 b1 d; ^- E' h# G
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    + o+ ^6 L' r' i( a+ ~
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');/ d' ]' v+ m) v# x4 u
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 J/ ~; O" G, j% f& B
  6. fwrite($client, json_encode($data)."\n");
    + B  S' F  {8 u* W* }
  7. // 读取推送结果
    : W2 W: z& T& R5 K$ h0 `
  8. echo fread($client, 8192);
复制代码

" @9 _6 ^. i/ Y' q
& L" E  W% T* m4 u  Y; X! `
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-6 12:25 , Processed in 0.112708 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!