您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10491|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    % d' J4 s, P5 |$ T3 ]2 `  ?: |
  2. require_once __DIR__ . '/Workerman/Autoloader.php';( R& P! U' w' @

  3. 5 U! Q" l+ O1 K6 m
  4. $worker = new Worker();; s3 F3 s1 W. x/ v- W
  5. // 4个进程
    ! j4 m& d9 e$ m6 r# H: V( @
  6. $worker->count = 4;% i; \' q0 H+ w4 `/ D% a
  7. // 每个进程启动后在当前进程新增一个Worker监听* M6 l+ p& g8 U- b( ~( [* l5 L
  8. $worker->onWorkerStart = function($worker)+ p6 ^8 J( x1 L! \4 F
  9. {
    " N' f2 J) n9 S9 R: ^! Y1 E) ^/ Y
  10.     /**  |2 ~8 l; l/ v5 A1 k( Q$ f
  11.      * 4个进程启动的时候都创建2016端口的Worker3 _# }$ p$ i" a+ [! n/ s
  12.      * 当执行到worker->listen()时会报Address already in use错误
    6 L# o8 Y2 h1 G$ U+ B
  13.      * 如果worker->count=1则不会报错
      `9 f% L& w" C: [
  14.      */; `: G2 o1 j+ P# Q5 Y- x
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');( I5 v, j' d* S+ Q
  16.     $inner_worker->onMessage = 'on_message';
    * K  b0 r# `4 P8 y' L. X1 E- p$ @
  17.     // 执行监听。这里会报Address already in use错误
    + l. @6 T; u8 ^2 m+ e  N
  18.     $inner_worker->listen();
    . d5 G) X5 |- _" l' o6 Z
  19. };
    , D/ z9 a1 u, l

  20. ; b0 e+ u$ p$ n( @4 V0 P
  21. $worker->onMessage = 'on_message';* P0 U3 b6 E5 Q6 G* f6 I3 ^/ C

  22. ( h; ^! x1 o( A" r' z
  23. function on_message($connection, $data)
    ) L& D) }, P; m0 |
  24. {; Y: u# g6 G( i' k3 H, b, r: a
  25.     $connection->send("hello\n");* ]: t% i! U1 K1 E  o3 B5 n
  26. }4 Y/ g6 V+ q' Q+ q7 b0 g, d% l1 n
  27. 2 e1 F+ J6 x4 O  S% r
  28. // 运行worker3 E$ n! [: o0 i6 J% W+ I- z$ j
  29. Worker::runAll();
    # N% O& q+ f2 g
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    6 l, L  R7 W4 y/ h
  31. ' _$ i4 ?' }( @9 f' p$ h
  32. use Workerman\Worker;! D6 Z* s$ D" G9 ^& s8 u
  33. require_once './Workerman/Autoloader.php';8 g1 T' B* {8 t, T. l8 w" u6 ~
  34. & B3 \" j: `1 h3 R
  35. $worker = new Worker('text://0.0.0.0:2015');: M" C& l6 K# ]) E2 c2 V8 ]
  36. // 4个进程
    6 A( y8 f# B% o0 L+ o3 z
  37. $worker->count = 4;; [3 B* h3 @! J% g
  38. // 每个进程启动后在当前进程新增一个Worker监听
    9 [+ }4 c7 Z$ Q
  39. $worker->onWorkerStart = function($worker)
    . P) s0 E. z% C* h
  40. {* O2 ?; @" L  q# `
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    2 s7 A  _8 W: Q- [8 x' {, k, ]
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ; p6 R0 D" E* n
  43.     $inner_worker->reusePort = true;
    ! j: u/ o+ G& x% R7 s
  44.     $inner_worker->onMessage = 'on_message';
      F8 B4 d& b8 a9 y  j( _. `* Y
  45.     // 执行监听。正常监听不会报错
      A8 z5 e0 q( t: X
  46.     $inner_worker->listen();
    . N" [4 U5 E5 Y" M; J
  47. };2 w) y5 ~) {3 }. y& K
  48. # _' T# e7 d$ ]  g
  49. $worker->onMessage = 'on_message';9 c+ U9 z5 M  {1 [( E8 W
  50. 1 h6 E3 Q2 {: w4 r; E5 P% j7 X9 Z
  51. function on_message($connection, $data)5 I# t" {. J4 H' V& f3 q( M. k
  52. {
    6 q" k" Q/ V2 B2 ?& F
  53.     $connection->send("hello\n");' g9 p" b, i% P% x! n
  54. }
    / s; H8 V5 e4 V6 f. u
  55. 4 M. w' t& a8 u3 p
  56. // 运行worker
    - p, J6 A& e- |
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    ( m, H: U0 |" W) e, V- o
  2. use Workerman\Worker;# V( P$ S4 G7 Y, m6 k
  3. require_once './Workerman/Autoloader.php';
    # P$ \/ q7 {9 B
  4. // 初始化一个worker容器,监听1234端口
    # T' W0 u+ j9 l1 Y
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    - r! Z( ~: L2 t9 W

  6. . D, x0 C  s/ c6 @: T9 U
  7. /*2 `  ]9 z- E2 Y# e) p! l6 ~
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    * {, j; Y! y0 }+ U1 m. s
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)% s4 z  b1 m4 Y$ W
  10. */
    , r. o2 c0 @1 s# X
  11. $worker->count = 1;5 [( m2 S/ }0 W, u2 g! k" X# V# S
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 B, ^) M/ x/ P" m, r/ F
  13. $worker->onWorkerStart = function($worker)
    ( A1 B# J. k# D0 L( ~$ t) t; C
  14. {& R& l) {2 J. \) J
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符5 i/ [  q! |% l
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');! p1 N5 ~& [8 K/ O
  17.     $inner_text_worker->onMessage = function($connection, $buffer)) T5 N( F3 |; n& D9 k0 A& q
  18.     {
    9 U! }( [) K0 f3 e# i4 U3 `- B
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据/ u0 I- \( {$ d
  20.         $data = json_decode($buffer, true);# q9 ~( D$ [: v$ }7 x# x- ?5 {; v
  21.         $uid = $data['uid'];
    ' `( V+ [) {5 N
  22.         // 通过workerman,向uid的页面推送数据
    ! _2 j/ V" T9 K, ?
  23.         $ret = sendMessageByUid($uid, $buffer);- H7 Y0 }% V& E, G+ x
  24.         // 返回推送结果) h0 r" G* q6 o6 K2 H/ E2 [
  25.         $connection->send($ret ? 'ok' : 'fail');# B5 H  I" R6 \. p
  26.     };
    5 U0 a! t6 O5 \3 U* o8 D8 t
  27.     // ## 执行监听 ##' _# X8 A7 O5 R2 o( D6 D
  28.     $inner_text_worker->listen();
    : V* W: N4 C7 U: y2 ~# W
  29. };
    " K: ^( C( x8 J& G! |! p) ^, H
  30. // 新增加一个属性,用来保存uid到connection的映射
    + i; c' ]9 E% p3 F! q- H2 S$ u8 }
  31. $worker->uidConnections = array();
    ) S# o9 L7 F( R
  32. // 当有客户端发来消息时执行的回调函数
    1 V/ M" ^5 O: j2 L9 {
  33. $worker->onMessage = function($connection, $data)
    1 h- Y; X0 Q" O
  34. {
    ) d- k- J: T! t( }4 e
  35.     global $worker;
    2 E! `. n0 u2 B, E+ ?. j" @
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    : Y  x3 z. \0 v: @  e6 s* s
  37.     if(!isset($connection->uid))
    , a9 W7 b/ w( p- q# \
  38.     {
    ; c  k! a6 L( U0 K8 n% N, X
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)9 I5 ~" \4 Q# U( G( \3 a
  40.        $connection->uid = $data;
    * m1 [1 v$ \) z, m% M( X: C
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    , K' E; M4 c; _' T, T  |
  42.         * 实现针对特定uid推送数据
    * T" H  k3 u8 N1 D3 @* f
  43.         */
    - v3 _0 n/ C: r' o5 I5 m, Z  _$ r
  44.        $worker->uidConnections[$connection->uid] = $connection;/ a, a  }; m; r' I4 h+ M! b( u
  45.        return;' t7 l. ]' j1 d+ ^$ o: W
  46.     }  C" [: d9 ^0 F' {0 P
  47. };
    ' G# s. \2 A4 u$ Z
  48. - V3 i, }( N5 W, G1 q8 \* V) I/ h3 e( D
  49. // 当有客户端连接断开时8 @8 C* d1 I0 D" m7 c2 I
  50. $worker->onClose = function($connection)0 m1 }+ l* `  P9 m* B* m& k+ l
  51. {
    : e2 d; n/ c; K& N  c
  52.     global $worker;; Q4 X3 m0 Z# s( q# U5 f& N8 W7 j
  53.     if(isset($connection->uid))
    3 G+ m- a6 D$ y% u1 A6 V, h* _
  54.     {' p$ m5 G: A+ u0 u" l
  55.         // 连接断开时删除映射* @% W- q' Y+ I9 |1 t8 Z
  56.         unset($worker->uidConnections[$connection->uid]);
    ' s7 S8 l! [# z# B4 _& B
  57.     }
    , r) g) \# l4 B- u* W
  58. };
    , E# o- x' [4 J

  59. + D9 C6 j: O' s! f
  60. // 向所有验证的用户推送数据( Q. ^9 Z7 V, S: |) C
  61. function broadcast($message)
    * E. m% I# t' r6 T' \
  62. {9 Y4 y3 W8 p  F( [0 \1 ?6 Y
  63.    global $worker;. L! ~' ?/ z+ T* r5 M, W! A
  64.    foreach($worker->uidConnections as $connection)
    " M% r, j' I1 a
  65.    {$ A. j' r# Y7 W$ f0 s+ o
  66.         $connection->send($message);9 Z5 \% D- ?2 `
  67.    }  c; m# Y4 Q# u4 f5 v8 d4 y; b3 p
  68. }
    $ v! R& H& X9 _/ V- }: V
  69. # b& A' U" }1 ^% M: u% W$ V
  70. // 针对uid推送数据
    " e# ?( e+ f& B# ~3 K
  71. function sendMessageByUid($uid, $message): o) C  ^3 D" g: `3 n
  72. {$ U- Q/ ^# M' \$ w! O$ u1 X
  73.     global $worker;3 X0 z6 m/ N( E. c
  74.     if(isset($worker->uidConnections[$uid]))$ z  s- _, c7 q7 C, t. e  w+ |+ k
  75.     {
    + N8 s, z* _7 j* Y/ b' S$ f
  76.         $connection = $worker->uidConnections[$uid];& B6 R8 x7 R0 K! k5 }1 ^
  77.         $connection->send($message);
    % k+ C0 x7 Y; j) H) {3 K
  78.         return true;! M5 p9 r: l" l. e
  79.     }) x2 ?9 n5 |# q, u6 j4 R) X
  80.     return false;
    4 c. f  e  {, a; Q5 v
  81. }
    5 U# T5 x, o: ?4 |1 A

  82. 0 `# t7 s4 I% y  ]$ Q* |
  83. // 运行所有的worker' O- u6 o  N2 b* G0 x9 m
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');# k5 x9 e1 I$ c; G4 g
  2. ws.onopen = function(){6 b' F2 o, i7 e" v" H
  3.     var uid = 'uid1';
    : D* Z( A5 G- t3 N, X
  4.     ws.send(uid);
    * [/ Q$ {) A) \  Z
  5. };) o0 ~9 u; ~% n2 X
  6. ws.onmessage = function(e){
    & w& S' x3 f8 E# Q5 b1 R
  7.     alert(e.data);5 F3 t7 v4 s: o5 S4 j/ {, o/ e9 O! N
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口: K# \2 a' h3 n7 M: ~
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);5 Q/ S. z* L6 ^! `3 l
  3. // 推送的数据,包含uid字段,表示是给这个uid推送4 Y* C+ V# g& S9 ~4 ?' \7 H8 L
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');; i+ f1 g# P8 q. b- u& ~/ ?0 s
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符# F5 h, I; D- T1 |, ~/ [
  6. fwrite($client, json_encode($data)."\n");
    6 Y+ H8 ?* Q+ i# D- k: w) e
  7. // 读取推送结果6 n0 l* R# c8 g
  8. echo fread($client, 8192);
复制代码
4 {: w- X% q; P% i
; l  R) w6 c1 R7 e: H8 H
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-17 16:55 , Processed in 0.144911 second(s), 21 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!